This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new baae15a [BEAM-11207] Metric Extraction via proto RPC API (#13272)
baae15a is described below
commit baae15a6e85033b7d32a38daf12bb03a7cc7fe74
Author: Kamil Wasilewski <[email protected]>
AuthorDate: Mon Nov 16 23:27:45 2020 +0100
[BEAM-11207] Metric Extraction via proto RPC API (#13272)
For the Go SDK.
---
sdks/go/pkg/beam/core/metrics/metrics.go | 118 +++++++++++++
.../go/pkg/beam/core/runtime/harness/monitoring.go | 187 +++------------------
.../beam/core/runtime/harness/monitoring_test.go | 33 ++--
sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go | 172 +++++++++++++++++++
.../beam/core/runtime/metricsx/metricsx_test.go | 166 ++++++++++++++++++
sdks/go/pkg/beam/core/runtime/metricsx/urns.go | 170 +++++++++++++++++++
sdks/go/pkg/beam/doc_test.go | 2 +-
sdks/go/pkg/beam/io/textio/sdf_test.go | 2 +-
sdks/go/pkg/beam/pipeline.go | 6 +
sdks/go/pkg/beam/runner.go | 6 +-
sdks/go/pkg/beam/runners/dataflow/dataflow.go | 24 +--
sdks/go/pkg/beam/runners/direct/direct.go | 16 +-
sdks/go/pkg/beam/runners/dot/dot.go | 10 +-
sdks/go/pkg/beam/runners/flink/flink.go | 2 +-
sdks/go/pkg/beam/runners/spark/spark.go | 2 +-
.../beam/runners/universal/runnerlib/execute.go | 48 +++++-
sdks/go/pkg/beam/runners/universal/universal.go | 20 +--
sdks/go/pkg/beam/runners/vet/vet.go | 8 +-
sdks/go/pkg/beam/testing/ptest/ptest.go | 3 +-
sdks/go/pkg/beam/x/beamx/run.go | 8 +
20 files changed, 770 insertions(+), 233 deletions(-)
diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go
b/sdks/go/pkg/beam/core/metrics/metrics.go
index 5b14c4a..33b8ef2 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics.go
@@ -381,6 +381,11 @@ func (m *distribution) get() (count, sum, min, max int64) {
return m.count, m.sum, m.min, m.max
}
+// DistributionValue is the value of a Distribution metric.
+type DistributionValue struct {
+ Count, Sum, Min, Max int64
+}
+
// Gauge is a time, value pair metric.
type Gauge struct {
name name
@@ -448,3 +453,116 @@ func (m *gauge) get() (int64, time.Time) {
defer m.mu.Unlock()
return m.v, m.t
}
+
+// GaugeValue is the value of a Gauge metric.
+type GaugeValue struct {
+ Value int64
+ Timestamp time.Time
+}
+
+// Results represents all metrics gathered during the job's execution.
+// It allows for querying metrics using a provided filter.
+type Results struct {
+ counters []CounterResult
+ distributions []DistributionResult
+ gauges []GaugeResult
+}
+
+// NewResults creates a new Results.
+func NewResults(
+ counters []CounterResult,
+ distributions []DistributionResult,
+ gauges []GaugeResult) *Results {
+ return &Results{counters, distributions, gauges}
+}
+
+// AllMetrics returns all metrics from a Results instance.
+func (mr Results) AllMetrics() QueryResults {
+ return QueryResults{mr.counters, mr.distributions, mr.gauges}
+}
+
+// TODO(BEAM-11217): Implement Query(Filter) and metrics filtering
+
+// QueryResults is the result of a query. Allows accessing all of the
+// metrics that matched the filter.
+type QueryResults struct {
+ counters []CounterResult
+ distributions []DistributionResult
+ gauges []GaugeResult
+}
+
+// Counters returns a slice of counter metrics.
+func (qr QueryResults) Counters() []CounterResult {
+ out := make([]CounterResult, len(qr.counters))
+ copy(out, qr.counters)
+ return out
+}
+
+// Distributions returns a slice of distribution metrics.
+func (qr QueryResults) Distributions() []DistributionResult {
+ out := make([]DistributionResult, len(qr.distributions))
+ copy(out, qr.distributions)
+ return out
+}
+
+// Gauges returns a slice of gauge metrics.
+func (qr QueryResults) Gauges() []GaugeResult {
+ out := make([]GaugeResult, len(qr.gauges))
+ copy(out, qr.gauges)
+ return out
+}
+
+// CounterResult is an attempted and a commited value of a counter metric plus
+// key.
+type CounterResult struct {
+ Attempted, Committed int64
+ Key StepKey
+}
+
+// Result returns committed metrics. Falls back to attempted metrics if
committed
+// are not populated (e.g. due to not being supported on a given runner).
+func (r CounterResult) Result() int64 {
+ if r.Committed != 0 {
+ return r.Committed
+ }
+ return r.Attempted
+}
+
+// DistributionResult is an attempted and a commited value of a distribution
+// metric plus key.
+type DistributionResult struct {
+ Attempted, Committed DistributionValue
+ Key StepKey
+}
+
+// Result returns committed metrics. Falls back to attempted metrics if
committed
+// are not populated (e.g. due to not being supported on a given runner).
+func (r DistributionResult) Result() DistributionValue {
+ empty := DistributionValue{}
+ if r.Committed != empty {
+ return r.Committed
+ }
+ return r.Attempted
+}
+
+// GaugeResult is an attempted and a commited value of a gauge metric plus
+// key.
+type GaugeResult struct {
+ Attempted, Committed GaugeValue
+ Key StepKey
+}
+
+// Result returns committed metrics. Falls back to attempted metrics if
committed
+// are not populated (e.g. due to not being supported on a given runner).
+func (r GaugeResult) Result() GaugeValue {
+ empty := GaugeValue{}
+ if r.Committed != empty {
+ return r.Committed
+ }
+ return r.Attempted
+}
+
+// StepKey uniquely identifies a metric within a pipeline graph.
+type StepKey struct {
+ Step, Name, Namespace string
+}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
index 53419ce..c9ddb80 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
@@ -16,121 +16,20 @@
package harness
import (
- "bytes"
"strconv"
"sync"
"sync/atomic"
"time"
- "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
- "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/metricsx"
pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
)
-type mUrn uint32
-
-// TODO: Pull these from the protos.
-var sUrns = [...]string{
- "beam:metric:user:sum_int64:v1",
- "beam:metric:user:sum_double:v1",
- "beam:metric:user:distribution_int64:v1",
- "beam:metric:user:distribution_double:v1",
- "beam:metric:user:latest_int64:v1",
- "beam:metric:user:latest_double:v1",
- "beam:metric:user:top_n_int64:v1",
- "beam:metric:user:top_n_double:v1",
- "beam:metric:user:bottom_n_int64:v1",
- "beam:metric:user:bottom_n_double:v1",
-
- "beam:metric:element_count:v1",
- "beam:metric:sampled_byte_size:v1",
-
- "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
- "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
- "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
- "beam:metric:ptransform_execution_time:total_msecs:v1",
-
- "beam:metric:ptransform_progress:remaining:v1",
- "beam:metric:ptransform_progress:completed:v1",
- "beam:metric:data_channel:read_index:v1",
-
- "TestingSentinelUrn", // Must remain last.
-}
-
-const (
- urnUserSumInt64 mUrn = iota
- urnUserSumFloat64
- urnUserDistInt64
- urnUserDistFloat64
- urnUserLatestMsInt64
- urnUserLatestMsFloat64
- urnUserTopNInt64
- urnUserTopNFloat64
- urnUserBottomNInt64
- urnUserBottomNFloat64
-
- urnElementCount
- urnSampledByteSize
-
- urnStartBundle
- urnProcessBundle
- urnFinishBundle
- urnTransformTotalTime
-
- urnProgressRemaining
- urnProgressCompleted
- urnDataChannelReadIndex
-
- urnTestSentinel // Must remain last.
-)
-
-// urnToType maps the urn to it's encoding type.
-// This function is written to be inlinable by the compiler.
-func urnToType(u mUrn) string {
- switch u {
- case urnUserSumInt64, urnElementCount, urnStartBundle,
urnProcessBundle, urnFinishBundle, urnTransformTotalTime:
- return "beam:metrics:sum_int64:v1"
- case urnUserSumFloat64:
- return "beam:metrics:sum_double:v1"
- case urnUserDistInt64, urnSampledByteSize:
- return "beam:metrics:distribution_int64:v1"
- case urnUserDistFloat64:
- return "beam:metrics:distribution_double:v1"
- case urnUserLatestMsInt64:
- return "beam:metrics:latest_int64:v1"
- case urnUserLatestMsFloat64:
- return "beam:metrics:latest_double:v1"
- case urnUserTopNInt64:
- return "beam:metrics:top_n_int64:v1"
- case urnUserTopNFloat64:
- return "beam:metrics:top_n_double:v1"
- case urnUserBottomNInt64:
- return "beam:metrics:bottom_n_int64:v1"
- case urnUserBottomNFloat64:
- return "beam:metrics:bottom_n_double:v1"
-
- case urnProgressRemaining, urnProgressCompleted:
- return "beam:metrics:progress:v1"
- case urnDataChannelReadIndex:
- return "beam:metrics:sum_int64:v1"
-
- // Monitoring Table isn't currently in the protos.
- // case ???:
- // return "beam:metrics:monitoring_table:v1"
-
- case urnTestSentinel:
- return "TestingSentinelType"
-
- default:
- panic("metric urn without specified type" + sUrns[u])
- }
-}
-
type shortKey struct {
metrics.Labels
- Urn mUrn // Urns fully specify their type.
+ Urn metricsx.Urn // Urns fully specify their type.
}
// shortIDCache retains lookup caches for short ids to the full monitoring
@@ -162,7 +61,7 @@ func (c *shortIDCache) getNextShortID() string {
// getShortID returns the short id for the given metric, and if
// it doesn't exist yet, stores the metadata.
// Assumes c.mu lock is held.
-func (c *shortIDCache) getShortID(l metrics.Labels, urn mUrn) string {
+func (c *shortIDCache) getShortID(l metrics.Labels, urn metricsx.Urn) string {
k := shortKey{l, urn}
s, ok := c.labels2ShortIds[k]
if ok {
@@ -171,8 +70,8 @@ func (c *shortIDCache) getShortID(l metrics.Labels, urn
mUrn) string {
s = c.getNextShortID()
c.labels2ShortIds[k] = s
c.shortIds2Infos[s] = &pipepb.MonitoringInfo{
- Urn: sUrns[urn],
- Type: urnToType(urn),
+ Urn: metricsx.UrnToString(urn),
+ Type: metricsx.UrnToType(urn),
Labels: userLabels(l),
}
return s
@@ -195,7 +94,7 @@ func init() {
defaultShortIDCache = newShortIDCache()
}
-func getShortID(l metrics.Labels, urn mUrn) string {
+func getShortID(l metrics.Labels, urn metricsx.Urn) string {
return defaultShortIDCache.getShortID(l, urn)
}
@@ -216,46 +115,46 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo,
map[string][]byte) {
payloads := make(map[string][]byte)
metrics.Extractor{
SumInt64: func(l metrics.Labels, v int64) {
- payload, err := int64Counter(v)
+ payload, err := metricsx.Int64Counter(v)
if err != nil {
panic(err)
}
- payloads[getShortID(l, urnUserSumInt64)] = payload
+ payloads[getShortID(l, metricsx.UrnUserSumInt64)] =
payload
monitoringInfo = append(monitoringInfo,
&pipepb.MonitoringInfo{
- Urn: sUrns[urnUserSumInt64],
- Type: urnToType(urnUserSumInt64),
+ Urn:
metricsx.UrnToString(metricsx.UrnUserSumInt64),
+ Type:
metricsx.UrnToType(metricsx.UrnUserSumInt64),
Labels: userLabels(l),
Payload: payload,
})
},
DistributionInt64: func(l metrics.Labels, count, sum, min, max
int64) {
- payload, err := int64Distribution(count, sum, min, max)
+ payload, err := metricsx.Int64Distribution(count, sum,
min, max)
if err != nil {
panic(err)
}
- payloads[getShortID(l, urnUserDistInt64)] = payload
+ payloads[getShortID(l, metricsx.UrnUserDistInt64)] =
payload
monitoringInfo = append(monitoringInfo,
&pipepb.MonitoringInfo{
- Urn: sUrns[urnUserDistInt64],
- Type: urnToType(urnUserDistInt64),
+ Urn:
metricsx.UrnToString(metricsx.UrnUserDistInt64),
+ Type:
metricsx.UrnToType(metricsx.UrnUserDistInt64),
Labels: userLabels(l),
Payload: payload,
})
},
GaugeInt64: func(l metrics.Labels, v int64, t time.Time) {
- payload, err := int64Latest(t, v)
+ payload, err := metricsx.Int64Latest(t, v)
if err != nil {
panic(err)
}
- payloads[getShortID(l, urnUserLatestMsInt64)] = payload
+ payloads[getShortID(l, metricsx.UrnUserLatestMsInt64)]
= payload
monitoringInfo = append(monitoringInfo,
&pipepb.MonitoringInfo{
- Urn: sUrns[urnUserLatestMsInt64],
- Type:
urnToType(urnUserLatestMsInt64),
+ Urn:
metricsx.UrnToString(metricsx.UrnUserLatestMsInt64),
+ Type:
metricsx.UrnToType(metricsx.UrnUserLatestMsInt64),
Labels: userLabels(l),
Payload: payload,
})
@@ -265,28 +164,28 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo,
map[string][]byte) {
// Get the execution monitoring information from the bundle plan.
if snapshot, ok := p.Progress(); ok {
- payload, err := int64Counter(snapshot.Count)
+ payload, err := metricsx.Int64Counter(snapshot.Count)
if err != nil {
panic(err)
}
// TODO(BEAM-9934): This metric should account for elements in
multiple windows.
- payloads[getShortID(metrics.PCollectionLabels(snapshot.PID),
urnElementCount)] = payload
+ payloads[getShortID(metrics.PCollectionLabels(snapshot.PID),
metricsx.UrnElementCount)] = payload
monitoringInfo = append(monitoringInfo,
&pipepb.MonitoringInfo{
- Urn: sUrns[urnElementCount],
- Type: urnToType(urnElementCount),
+ Urn:
metricsx.UrnToString(metricsx.UrnElementCount),
+ Type:
metricsx.UrnToType(metricsx.UrnElementCount),
Labels: map[string]string{
"PCOLLECTION": snapshot.PID,
},
Payload: payload,
})
- payloads[getShortID(metrics.PTransformLabels(snapshot.ID),
urnDataChannelReadIndex)] = payload
+ payloads[getShortID(metrics.PTransformLabels(snapshot.ID),
metricsx.UrnDataChannelReadIndex)] = payload
monitoringInfo = append(monitoringInfo,
&pipepb.MonitoringInfo{
- Urn: sUrns[urnDataChannelReadIndex],
- Type: urnToType(urnDataChannelReadIndex),
+ Urn:
metricsx.UrnToString(metricsx.UrnDataChannelReadIndex),
+ Type:
metricsx.UrnToType(metricsx.UrnDataChannelReadIndex),
Labels: map[string]string{
"PTRANSFORM": snapshot.ID,
},
@@ -305,39 +204,3 @@ func userLabels(l metrics.Labels) map[string]string {
"NAME": l.Name(),
}
}
-
-func int64Counter(v int64) ([]byte, error) {
- var buf bytes.Buffer
- if err := coder.EncodeVarInt(v, &buf); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
-}
-
-func int64Latest(t time.Time, v int64) ([]byte, error) {
- var buf bytes.Buffer
- if err := coder.EncodeVarInt(mtime.FromTime(t).Milliseconds(), &buf);
err != nil {
- return nil, err
- }
- if err := coder.EncodeVarInt(v, &buf); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
-}
-
-func int64Distribution(count, sum, min, max int64) ([]byte, error) {
- var buf bytes.Buffer
- if err := coder.EncodeVarInt(count, &buf); err != nil {
- return nil, err
- }
- if err := coder.EncodeVarInt(sum, &buf); err != nil {
- return nil, err
- }
- if err := coder.EncodeVarInt(min, &buf); err != nil {
- return nil, err
- }
- if err := coder.EncodeVarInt(max, &buf); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
-}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go
b/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go
index 456fce4..ed792dd 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring_test.go
@@ -20,34 +20,35 @@ import (
"testing"
"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/metricsx"
)
func TestGetShortID(t *testing.T) {
tests := []struct {
id string
- urn mUrn
+ urn metricsx.Urn
labels metrics.Labels
expectedUrn string
expectedType string
}{
{
id: "1",
- urn: urnUserDistInt64,
+ urn: metricsx.UrnUserDistInt64,
expectedUrn: "beam:metric:user:distribution_int64:v1",
expectedType: "beam:metrics:distribution_int64:v1",
}, {
id: "2",
- urn: urnElementCount,
+ urn: metricsx.UrnElementCount,
expectedUrn: "beam:metric:element_count:v1",
expectedType: "beam:metrics:sum_int64:v1",
}, {
id: "3",
- urn: urnProgressCompleted,
+ urn: metricsx.UrnProgressCompleted,
expectedUrn:
"beam:metric:ptransform_progress:completed:v1",
expectedType: "beam:metrics:progress:v1",
}, {
id: "4",
- urn: urnUserDistFloat64,
+ urn: metricsx.UrnUserDistFloat64,
expectedUrn: "beam:metric:user:distribution_double:v1",
expectedType: "beam:metrics:distribution_double:v1",
}, {
@@ -56,25 +57,25 @@ func TestGetShortID(t *testing.T) {
// in the list, or vice versa, this should fail with
either
// an index out of range panic or a mismatch.
id: "5",
- urn: urnTestSentinel,
+ urn: metricsx.UrnTestSentinel,
expectedUrn: "TestingSentinelUrn",
expectedType: "TestingSentinelType",
}, {
id: "6",
- urn: urnFinishBundle,
+ urn: metricsx.UrnFinishBundle,
expectedUrn:
"beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
expectedType: "beam:metrics:sum_int64:v1",
}, {
// This case and the next one validates that different
labels
// with the same urn are in fact assigned different
short ids.
id: "7",
- urn: urnUserSumInt64,
+ urn: metricsx.UrnUserSumInt64,
labels: metrics.UserLabels("myT", "harness",
"metricNumber7"),
expectedUrn: "beam:metric:user:sum_int64:v1",
expectedType: "beam:metrics:sum_int64:v1",
}, {
id: "8",
- urn: urnUserSumInt64,
+ urn: metricsx.UrnUserSumInt64,
labels: metrics.UserLabels("myT", "harness",
"metricNumber8"),
expectedUrn: "beam:metric:user:sum_int64:v1",
expectedType: "beam:metrics:sum_int64:v1",
@@ -84,13 +85,13 @@ func TestGetShortID(t *testing.T) {
// user metrics are unique per label set, but this
isn't the layer
// to validate that condition.
id: "9",
- urn: urnUserTopNFloat64,
+ urn: metricsx.UrnUserTopNFloat64,
labels: metrics.UserLabels("myT", "harness",
"metricNumber7"),
expectedUrn: "beam:metric:user:top_n_double:v1",
expectedType: "beam:metrics:top_n_double:v1",
}, {
id: "a",
- urn: urnElementCount,
+ urn: metricsx.UrnElementCount,
labels: metrics.PCollectionLabels("myPCol"),
expectedUrn: "beam:metric:element_count:v1",
expectedType: "beam:metrics:sum_int64:v1",
@@ -133,7 +134,7 @@ func TestGetShortID(t *testing.T) {
// is initialized properly.
func TestShortIdCache_Default(t *testing.T) {
defaultShortIDCache.mu.Lock()
- s := getShortID(metrics.UserLabels("this", "doesn't", "matter"),
urnTestSentinel)
+ s := getShortID(metrics.UserLabels("this", "doesn't", "matter"),
metricsx.UrnTestSentinel)
defaultShortIDCache.mu.Unlock()
info := shortIdsToInfos([]string{s})[s]
@@ -148,11 +149,11 @@ func TestShortIdCache_Default(t *testing.T) {
func BenchmarkGetShortID(b *testing.B) {
b.Run("new", func(b *testing.B) {
l := metrics.UserLabels("this", "doesn't",
strconv.FormatInt(-1, 36))
- last := getShortID(l, urnTestSentinel)
+ last := getShortID(l, metricsx.UrnTestSentinel)
for i := int64(0); i < int64(b.N); i++ {
// Ensure it's allocated to the stack.
l = metrics.UserLabels("this", "doesn't",
strconv.FormatInt(i, 36))
- got := getShortID(l, urnTestSentinel)
+ got := getShortID(l, metricsx.UrnTestSentinel)
if got == last {
b.Fatalf("short collision: at %s", got)
}
@@ -162,9 +163,9 @@ func BenchmarkGetShortID(b *testing.B) {
b.Run("amortized", func(b *testing.B) {
l := metrics.UserLabels("this", "doesn't", "matter")
c := newShortIDCache()
- want := c.getShortID(l, urnTestSentinel)
+ want := c.getShortID(l, metricsx.UrnTestSentinel)
for i := 0; i < b.N; i++ {
- got := c.getShortID(l, urnTestSentinel)
+ got := c.getShortID(l, metricsx.UrnTestSentinel)
if got != want {
b.Fatalf("different short ids: got %s, want
%s", got, want)
}
diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
new file mode 100644
index 0000000..6cd10b4
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metricsx
+
+import (
+ "bytes"
+ "fmt"
+ "log"
+ "time"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+// FromMonitoringInfos extracts metrics from monitored states and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed
[]*pipepb.MonitoringInfo) *metrics.Results {
+ ac, ad, ag := groupByType(attempted)
+ cc, cd, cg := groupByType(committed)
+
+ return metrics.NewResults(mergeCounters(ac, cc), mergeDistributions(ad,
cd), mergeGauges(ag, cg))
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+ map[metrics.StepKey]int64,
+ map[metrics.StepKey]metrics.DistributionValue,
+ map[metrics.StepKey]metrics.GaugeValue) {
+ counters := make(map[metrics.StepKey]int64)
+ distributions := make(map[metrics.StepKey]metrics.DistributionValue)
+ gauges := make(map[metrics.StepKey]metrics.GaugeValue)
+
+ for _, minfo := range minfos {
+ key, err := extractKey(minfo)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ r := bytes.NewReader(minfo.GetPayload())
+
+ switch minfo.GetType() {
+ case "beam:metrics:sum_int64:v1":
+ value, err := extractCounterValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ counters[key] = value
+ case "beam:metrics:distribution_int64:v1":
+ value, err := extractDistributionValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ distributions[key] = value
+ case
+ "beam:metrics:latest_int64:v1",
+ "beam:metrics:top_n_int64:v1",
+ "beam:metrics:bottom_n_int64:v1":
+ value, err := extractGaugeValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ gauges[key] = value
+ default:
+ log.Println("unknown metric type")
+ }
+ }
+ return counters, distributions, gauges
+}
+
+func mergeCounters(
+ attempted map[metrics.StepKey]int64,
+ committed map[metrics.StepKey]int64) []metrics.CounterResult {
+ res := make([]metrics.CounterResult, 0)
+
+ for k := range attempted {
+ v := committed[k]
+ res = append(res, metrics.CounterResult{Attempted:
attempted[k], Committed: v, Key: k})
+ }
+ return res
+}
+
+func mergeDistributions(
+ attempted map[metrics.StepKey]metrics.DistributionValue,
+ committed map[metrics.StepKey]metrics.DistributionValue)
[]metrics.DistributionResult {
+ res := make([]metrics.DistributionResult, 0)
+
+ for k := range attempted {
+ v := committed[k]
+ res = append(res, metrics.DistributionResult{Attempted:
attempted[k], Committed: v, Key: k})
+ }
+ return res
+}
+
+func mergeGauges(
+ attempted map[metrics.StepKey]metrics.GaugeValue,
+ committed map[metrics.StepKey]metrics.GaugeValue) []metrics.GaugeResult
{
+ res := make([]metrics.GaugeResult, 0)
+
+ for k := range attempted {
+ v := committed[k]
+ res = append(res, metrics.GaugeResult{Attempted: attempted[k],
Committed: v, Key: k})
+ }
+ return res
+}
+
+func extractKey(mi *pipepb.MonitoringInfo) (metrics.StepKey, error) {
+ labels := newLabels(mi.GetLabels())
+ stepName := labels.Transform()
+ if stepName == "" {
+ return metrics.StepKey{}, fmt.Errorf("Failed to deduce Step
from MonitoringInfo: %v", mi)
+ }
+ return metrics.StepKey{Step: stepName, Name: labels.Name(), Namespace:
labels.Namespace()}, nil
+}
+
+func extractCounterValue(reader *bytes.Reader) (int64, error) {
+ value, err := coder.DecodeVarInt(reader)
+ if err != nil {
+ return -1, err
+ }
+ return value, nil
+}
+
+func extractDistributionValue(reader *bytes.Reader)
(metrics.DistributionValue, error) {
+ values, err := decodeMany(reader, 4)
+ if err != nil {
+ return metrics.DistributionValue{}, err
+ }
+ return metrics.DistributionValue{Count: values[0], Sum: values[1], Min:
values[2], Max: values[3]}, nil
+}
+
+func extractGaugeValue(reader *bytes.Reader) (metrics.GaugeValue, error) {
+ values, err := decodeMany(reader, 2)
+ if err != nil {
+ return metrics.GaugeValue{}, err
+ }
+ return metrics.GaugeValue{Timestamp: time.Unix(0,
values[0]*int64(time.Millisecond)), Value: values[1]}, nil
+}
+
+func newLabels(miLabels map[string]string) *metrics.Labels {
+ labels := metrics.UserLabels(miLabels["PTRANSFORM"],
miLabels["NAMESPACE"], miLabels["NAME"])
+ return &labels
+}
+
+func decodeMany(reader *bytes.Reader, size int) ([]int64, error) {
+ var err error
+ values := make([]int64, size)
+
+ for i := 0; i < size; i++ {
+ values[i], err = coder.DecodeVarInt(reader)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return values, err
+}
diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
new file mode 100644
index 0000000..83add87
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metricsx
+
+import (
+ "testing"
+ "time"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+ "github.com/google/go-cmp/cmp"
+)
+
+func TestFromMonitoringInfos_Counters(t *testing.T) {
+ var value int64 = 15
+ want := metrics.CounterResult{
+ Attempted: 15,
+ Committed: 0,
+ Key: metrics.StepKey{
+ Step: "main.customDoFn",
+ Name: "customCounter",
+ Namespace: "customDoFn",
+ }}
+
+ payload, err := Int64Counter(value)
+ if err != nil {
+ t.Fatalf("Failed to encode Int64Counter: %v", err)
+ }
+
+ labels := map[string]string{
+ "PTRANSFORM": "main.customDoFn",
+ "NAMESPACE": "customDoFn",
+ "NAME": "customCounter",
+ }
+
+ mInfo := &pipepb.MonitoringInfo{
+ Urn: UrnToString(UrnUserSumInt64),
+ Type: UrnToType(UrnUserSumInt64),
+ Labels: labels,
+ Payload: payload,
+ }
+
+ attempted := []*pipepb.MonitoringInfo{mInfo}
+ committed := []*pipepb.MonitoringInfo{}
+
+ got := FromMonitoringInfos(attempted, committed).AllMetrics().Counters()
+ size := len(got)
+ if size < 1 {
+ t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
+ }
+ if d := cmp.Diff(want, got[0]); d != "" {
+ t.Fatalf("Invalid counter: got: %v, want: %v,
diff(-want,+got):\n %v",
+ got[0], want, d)
+ }
+}
+
+func TestFromMonitoringInfos_Distributions(t *testing.T) {
+ var count, sum, min, max int64 = 100, 5, -12, 30
+
+ want := metrics.DistributionResult{
+ Attempted: metrics.DistributionValue{
+ Count: 100,
+ Sum: 5,
+ Min: -12,
+ Max: 30,
+ },
+ Committed: metrics.DistributionValue{},
+ Key: metrics.StepKey{
+ Step: "main.customDoFn",
+ Name: "customDist",
+ Namespace: "customDoFn",
+ }}
+
+ payload, err := Int64Distribution(count, sum, min, max)
+ if err != nil {
+ t.Fatalf("Failed to encode Int64Distribution: %v", err)
+ }
+
+ labels := map[string]string{
+ "PTRANSFORM": "main.customDoFn",
+ "NAMESPACE": "customDoFn",
+ "NAME": "customDist",
+ }
+
+ mInfo := &pipepb.MonitoringInfo{
+ Urn: UrnToString(UrnUserDistInt64),
+ Type: UrnToType(UrnUserDistInt64),
+ Labels: labels,
+ Payload: payload,
+ }
+
+ attempted := []*pipepb.MonitoringInfo{mInfo}
+ committed := []*pipepb.MonitoringInfo{}
+
+ got := FromMonitoringInfos(attempted,
committed).AllMetrics().Distributions()
+ size := len(got)
+ if size < 1 {
+ t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
+ }
+ if d := cmp.Diff(want, got[0]); d != "" {
+ t.Fatalf("Invalid distribution: got: %v, want: %v,
diff(-want,+got):\n %v",
+ got[0], want, d)
+ }
+}
+
+func TestFromMonitoringInfos_Gauges(t *testing.T) {
+ var value int64 = 100
+ loc, _ := time.LoadLocation("Local")
+ tm := time.Date(2020, 11, 9, 17, 52, 28, 462*int(time.Millisecond), loc)
+
+ want := metrics.GaugeResult{
+ Attempted: metrics.GaugeValue{
+ Value: 100,
+ Timestamp: tm,
+ },
+ Committed: metrics.GaugeValue{},
+ Key: metrics.StepKey{
+ Step: "main.customDoFn",
+ Name: "customGauge",
+ Namespace: "customDoFn",
+ }}
+
+ payload, err := Int64Latest(tm, value)
+ if err != nil {
+ t.Fatalf("Failed to encode Int64Latest: %v", err)
+ }
+
+ labels := map[string]string{
+ "PTRANSFORM": "main.customDoFn",
+ "NAMESPACE": "customDoFn",
+ "NAME": "customGauge",
+ }
+
+ mInfo := &pipepb.MonitoringInfo{
+ Urn: UrnToString(UrnUserLatestMsInt64),
+ Type: UrnToType(UrnUserLatestMsInt64),
+ Labels: labels,
+ Payload: payload,
+ }
+
+ attempted := []*pipepb.MonitoringInfo{mInfo}
+ committed := []*pipepb.MonitoringInfo{}
+
+ got := FromMonitoringInfos(attempted, committed).AllMetrics().Gauges()
+ size := len(got)
+ if size < 1 {
+ t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
+ }
+ if d := cmp.Diff(want, got[0]); d != "" {
+ t.Fatalf("Invalid gauge: got: %v, want: %v, diff(-want,+got):\n
%v",
+ got[0], want, d)
+ }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/urns.go
b/sdks/go/pkg/beam/core/runtime/metricsx/urns.go
new file mode 100644
index 0000000..67aed96
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/urns.go
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metricsx
+
+import (
+ "bytes"
+ "time"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+)
+
+// Urn is an enum type for representing urns of metrics and monitored states.
+type Urn uint32
+
+// TODO: Pull these from the protos.
+var sUrns = [...]string{
+ "beam:metric:user:sum_int64:v1",
+ "beam:metric:user:sum_double:v1",
+ "beam:metric:user:distribution_int64:v1",
+ "beam:metric:user:distribution_double:v1",
+ "beam:metric:user:latest_int64:v1",
+ "beam:metric:user:latest_double:v1",
+ "beam:metric:user:top_n_int64:v1",
+ "beam:metric:user:top_n_double:v1",
+ "beam:metric:user:bottom_n_int64:v1",
+ "beam:metric:user:bottom_n_double:v1",
+
+ "beam:metric:element_count:v1",
+ "beam:metric:sampled_byte_size:v1",
+
+ "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
+ "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
+ "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
+ "beam:metric:ptransform_execution_time:total_msecs:v1",
+
+ "beam:metric:ptransform_progress:remaining:v1",
+ "beam:metric:ptransform_progress:completed:v1",
+ "beam:metric:data_channel:read_index:v1",
+
+ "TestingSentinelUrn", // Must remain last.
+}
+
+// The supported urns of metrics and monitored states.
+const (
+ UrnUserSumInt64 Urn = iota
+ UrnUserSumFloat64
+ UrnUserDistInt64
+ UrnUserDistFloat64
+ UrnUserLatestMsInt64
+ UrnUserLatestMsFloat64
+ UrnUserTopNInt64
+ UrnUserTopNFloat64
+ UrnUserBottomNInt64
+ UrnUserBottomNFloat64
+
+ UrnElementCount
+ UrnSampledByteSize
+
+ UrnStartBundle
+ UrnProcessBundle
+ UrnFinishBundle
+ UrnTransformTotalTime
+
+ UrnProgressRemaining
+ UrnProgressCompleted
+ UrnDataChannelReadIndex
+
+ UrnTestSentinel // Must remain last.
+)
+
+// UrnToString returns a string representation of the urn.
+func UrnToString(u Urn) string {
+ return sUrns[u]
+}
+
+// UrnToType maps the urn to it's encoding type.
+// This function is written to be inlinable by the compiler.
+func UrnToType(u Urn) string {
+ switch u {
+ case UrnUserSumInt64, UrnElementCount, UrnStartBundle,
UrnProcessBundle, UrnFinishBundle, UrnTransformTotalTime:
+ return "beam:metrics:sum_int64:v1"
+ case UrnUserSumFloat64:
+ return "beam:metrics:sum_double:v1"
+ case UrnUserDistInt64, UrnSampledByteSize:
+ return "beam:metrics:distribution_int64:v1"
+ case UrnUserDistFloat64:
+ return "beam:metrics:distribution_double:v1"
+ case UrnUserLatestMsInt64:
+ return "beam:metrics:latest_int64:v1"
+ case UrnUserLatestMsFloat64:
+ return "beam:metrics:latest_double:v1"
+ case UrnUserTopNInt64:
+ return "beam:metrics:top_n_int64:v1"
+ case UrnUserTopNFloat64:
+ return "beam:metrics:top_n_double:v1"
+ case UrnUserBottomNInt64:
+ return "beam:metrics:bottom_n_int64:v1"
+ case UrnUserBottomNFloat64:
+ return "beam:metrics:bottom_n_double:v1"
+
+ case UrnProgressRemaining, UrnProgressCompleted:
+ return "beam:metrics:progress:v1"
+ case UrnDataChannelReadIndex:
+ return "beam:metrics:sum_int64:v1"
+
+ // Monitoring Table isn't currently in the protos.
+ // case ???:
+ // return "beam:metrics:monitoring_table:v1"
+
+ case UrnTestSentinel:
+ return "TestingSentinelType"
+
+ default:
+ panic("metric urn without specified type" + sUrns[u])
+ }
+}
+
+// Int64Counter returns an encoded payload of the integer counter.
+func Int64Counter(v int64) ([]byte, error) {
+ var buf bytes.Buffer
+ if err := coder.EncodeVarInt(v, &buf); err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+// Int64Latest returns an encoded payload of the latest seen integer value.
+func Int64Latest(t time.Time, v int64) ([]byte, error) {
+ var buf bytes.Buffer
+ if err := coder.EncodeVarInt(mtime.FromTime(t).Milliseconds(), &buf);
err != nil {
+ return nil, err
+ }
+ if err := coder.EncodeVarInt(v, &buf); err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+// Int64Distribution returns an encoded payload of the distribution of an
+// integer value.
+func Int64Distribution(count, sum, min, max int64) ([]byte, error) {
+ var buf bytes.Buffer
+ if err := coder.EncodeVarInt(count, &buf); err != nil {
+ return nil, err
+ }
+ if err := coder.EncodeVarInt(sum, &buf); err != nil {
+ return nil, err
+ }
+ if err := coder.EncodeVarInt(min, &buf); err != nil {
+ return nil, err
+ }
+ if err := coder.EncodeVarInt(max, &buf); err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
diff --git a/sdks/go/pkg/beam/doc_test.go b/sdks/go/pkg/beam/doc_test.go
index 92a2b03..6c27980 100644
--- a/sdks/go/pkg/beam/doc_test.go
+++ b/sdks/go/pkg/beam/doc_test.go
@@ -74,7 +74,7 @@ func Example_gettingStarted() {
// pipeline can be executed by a PipelineRunner. The direct runner
executes the
// transforms directly, sequentially, in this one process, which is
useful for
// unit tests and simple experiments:
- if err := direct.Execute(context.Background(), p); err != nil {
+ if _, err := direct.Execute(context.Background(), p); err != nil {
fmt.Printf("Pipeline failed: %v", err)
}
}
diff --git a/sdks/go/pkg/beam/io/textio/sdf_test.go
b/sdks/go/pkg/beam/io/textio/sdf_test.go
index 05a26fa..733d79a 100644
--- a/sdks/go/pkg/beam/io/textio/sdf_test.go
+++ b/sdks/go/pkg/beam/io/textio/sdf_test.go
@@ -33,7 +33,7 @@ func TestReadSdf(t *testing.T) {
lines := ReadSdf(s, f)
passert.Count(s, lines, "NumLines", 1)
- if err := beam.Run(context.Background(), "direct", p); err != nil {
+ if _, err := beam.Run(context.Background(), "direct", p); err != nil {
t.Fatalf("Failed to execute job: %v", err)
}
}
diff --git a/sdks/go/pkg/beam/pipeline.go b/sdks/go/pkg/beam/pipeline.go
index 0c70463..26087d4 100644
--- a/sdks/go/pkg/beam/pipeline.go
+++ b/sdks/go/pkg/beam/pipeline.go
@@ -17,6 +17,7 @@ package beam
import (
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
)
// Scope is a hierarchical grouping for composite transforms. Scopes can be
@@ -85,3 +86,8 @@ func (p *Pipeline) Build() ([]*graph.MultiEdge,
[]*graph.Node, error) {
func (p *Pipeline) String() string {
return p.real.String()
}
+
+// PipelineResult is the result of beamx.RunWithMetrics.
+type PipelineResult interface {
+ Metrics() metrics.Results
+}
diff --git a/sdks/go/pkg/beam/runner.go b/sdks/go/pkg/beam/runner.go
index 28563bd..1b6b41f 100644
--- a/sdks/go/pkg/beam/runner.go
+++ b/sdks/go/pkg/beam/runner.go
@@ -27,12 +27,12 @@ import (
// verification, but require that it is stored in Init and used for Run.
var (
- runners = make(map[string]func(ctx context.Context, p *Pipeline) error)
+ runners = make(map[string]func(ctx context.Context, p *Pipeline)
(PipelineResult, error))
)
// RegisterRunner associates the name with the supplied runner, making it
available
// to execute a pipeline via Run.
-func RegisterRunner(name string, fn func(ctx context.Context, p *Pipeline)
error) {
+func RegisterRunner(name string, fn func(ctx context.Context, p *Pipeline)
(PipelineResult, error)) {
if _, ok := runners[name]; ok {
panic(fmt.Sprintf("runner %v already defined", name))
}
@@ -42,7 +42,7 @@ func RegisterRunner(name string, fn func(ctx context.Context,
p *Pipeline) error
// Run executes the pipeline using the selected registred runner. It is
customary
// to define a "runner" with no default as a flag to let users control runner
// selection.
-func Run(ctx context.Context, runner string, p *Pipeline) error {
+func Run(ctx context.Context, runner string, p *Pipeline) (PipelineResult,
error) {
fn, ok := runners[runner]
if !ok {
log.Exitf(ctx, "Runner %v not registered. Forgot to _ import
it?", runner)
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 09b62c6..96ecde0 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -84,24 +84,24 @@ var unique int32
// Execute runs the given pipeline on Google Cloud Dataflow. It uses the
// default application credentials to submit the job.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult,
error) {
// (1) Gather job options
project := *gcpopts.Project
if project == "" {
- return errors.New("no Google Cloud project specified. Use
--project=<project>")
+ return nil, errors.New("no Google Cloud project specified. Use
--project=<project>")
}
region := gcpopts.GetRegion(ctx)
if region == "" {
- return errors.New("No Google Cloud region specified. Use
--region=<region>. See
https://cloud.google.com/dataflow/docs/concepts/regional-endpoints")
+ return nil, errors.New("No Google Cloud region specified. Use
--region=<region>. See
https://cloud.google.com/dataflow/docs/concepts/regional-endpoints")
}
if *stagingLocation == "" {
- return errors.New("no GCS staging location specified. Use
--staging_location=gs://<bucket>/<path>")
+ return nil, errors.New("no GCS staging location specified. Use
--staging_location=gs://<bucket>/<path>")
}
var jobLabels map[string]string
if *labels != "" {
if err := json.Unmarshal([]byte(*labels), &jobLabels); err !=
nil {
- return errors.Wrapf(err, "error reading --label flag as
JSON")
+ return nil, errors.Wrapf(err, "error reading --label
flag as JSON")
}
}
@@ -120,7 +120,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
}
if *autoscalingAlgorithm != "" {
if *autoscalingAlgorithm != "NONE" && *autoscalingAlgorithm !=
"THROUGHPUT_BASED" {
- return errors.New("invalid autoscaling algorithm. Use
--autoscaling_algorithm=(NONE|THROUGHPUT_BASED)")
+ return nil, errors.New("invalid autoscaling algorithm.
Use --autoscaling_algorithm=(NONE|THROUGHPUT_BASED)")
}
}
@@ -173,15 +173,15 @@ func Execute(ctx context.Context, p *beam.Pipeline) error
{
edges, _, err := p.Build()
if err != nil {
- return err
+ return nil, err
}
enviroment, err := graphx.CreateEnvironment(ctx,
jobopts.GetEnvironmentUrn(ctx), getContainerImage)
if err != nil {
- return errors.WithContext(err, "generating model pipeline")
+ return nil, errors.WithContext(err, "generating model pipeline")
}
model, err := graphx.Marshal(edges, &graphx.Options{Environment:
enviroment})
if err != nil {
- return errors.WithContext(err, "generating model pipeline")
+ return nil, errors.WithContext(err, "generating model pipeline")
}
// NOTE(herohde) 10/8/2018: the last segment of the names must be
"worker" and "dataflow-worker.jar".
@@ -197,14 +197,14 @@ func Execute(ctx context.Context, p *beam.Pipeline) error
{
log.Info(ctx, proto.MarshalTextString(model))
job, err := dataflowlib.Translate(ctx, model, opts, workerURL,
jarURL, modelURL)
if err != nil {
- return err
+ return nil, err
}
dataflowlib.PrintJob(ctx, job)
- return nil
+ return nil, nil
}
_, err = dataflowlib.Execute(ctx, model, opts, workerURL, jarURL,
modelURL, *endpoint, *executeAsync)
- return err
+ return nil, err
}
func gcsRecorderHook(opts []string) perf.CaptureHook {
bucket, prefix, err := gcsx.ParseObject(opts[0])
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go
b/sdks/go/pkg/beam/runners/direct/direct.go
index a2c3807..5f07135 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -37,7 +37,7 @@ func init() {
}
// Execute runs the pipeline in-process.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult,
error) {
log.Info(ctx, "Executing pipeline with the direct runner.")
if !beam.Initialized() {
@@ -49,33 +49,33 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
if *jobopts.Strict {
log.Info(ctx, "Strict mode enabled, applying additional
validation.")
- if err := vet.Execute(ctx, p); err != nil {
- return errors.Wrap(err, "strictness check failed")
+ if _, err := vet.Execute(ctx, p); err != nil {
+ return nil, errors.Wrap(err, "strictness check failed")
}
log.Info(ctx, "Strict mode validation passed.")
}
edges, _, err := p.Build()
if err != nil {
- return errors.Wrap(err, "invalid pipeline")
+ return nil, errors.Wrap(err, "invalid pipeline")
}
plan, err := Compile(edges)
if err != nil {
- return errors.Wrap(err, "translation failed")
+ return nil, errors.Wrap(err, "translation failed")
}
log.Info(ctx, plan)
if err = plan.Execute(ctx, "", exec.DataContext{}); err != nil {
plan.Down(ctx) // ignore any teardown errors
- return err
+ return nil, err
}
if err = plan.Down(ctx); err != nil {
- return err
+ return nil, err
}
// TODO(lostluck) 2020/01/24: What's the right way to expose the
// metrics store for the direct runner?
metrics.DumpToLogFromStore(ctx, plan.Store())
- return nil
+ return nil, nil
}
// Compile translates a pipeline to a multi-bundle execution plan.
diff --git a/sdks/go/pkg/beam/runners/dot/dot.go
b/sdks/go/pkg/beam/runners/dot/dot.go
index 547aa10..9a95ece 100644
--- a/sdks/go/pkg/beam/runners/dot/dot.go
+++ b/sdks/go/pkg/beam/runners/dot/dot.go
@@ -37,19 +37,19 @@ func init() {
var dotFile = flag.String("dot_file", "", "DOT output file to create")
// Execute produces a DOT representation of the pipeline.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult,
error) {
if *dotFile == "" {
- return errors.New("must supply dot_file argument")
+ return nil, errors.New("must supply dot_file argument")
}
edges, nodes, err := p.Build()
if err != nil {
- return errors.New("can't get data to render")
+ return nil, errors.New("can't get data to render")
}
var buf bytes.Buffer
if err := dotlib.Render(edges, nodes, &buf); err != nil {
- return err
+ return nil, err
}
- return ioutil.WriteFile(*dotFile, buf.Bytes(), 0644)
+ return nil, ioutil.WriteFile(*dotFile, buf.Bytes(), 0644)
}
diff --git a/sdks/go/pkg/beam/runners/flink/flink.go
b/sdks/go/pkg/beam/runners/flink/flink.go
index 0384b98..8dd0157 100644
--- a/sdks/go/pkg/beam/runners/flink/flink.go
+++ b/sdks/go/pkg/beam/runners/flink/flink.go
@@ -29,6 +29,6 @@ func init() {
// Execute runs the given pipeline on Flink. Convenience wrapper over the
// universal runner.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult,
error) {
return universal.Execute(ctx, p)
}
diff --git a/sdks/go/pkg/beam/runners/spark/spark.go
b/sdks/go/pkg/beam/runners/spark/spark.go
index c216c59..5d67200 100644
--- a/sdks/go/pkg/beam/runners/spark/spark.go
+++ b/sdks/go/pkg/beam/runners/spark/spark.go
@@ -29,6 +29,6 @@ func init() {
// Execute runs the given pipeline on Spark. Convenience wrapper over the
// universal runner.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult,
error) {
return universal.Execute(ctx, p)
}
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
index dbb3387..38d7a75 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
@@ -20,6 +20,8 @@ import (
"os"
"time"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/metricsx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/go/pkg/beam/log"
jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
@@ -29,19 +31,20 @@ import (
// Execute executes a pipeline on the universal runner serving the given
endpoint.
// Convenience function.
-func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt
*JobOptions, async bool) (string, error) {
+func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt
*JobOptions, async bool) (*universalPipelineResult, error) {
// (1) Prepare job to obtain artifact staging instructions.
+ presult := &universalPipelineResult{JobID: ""}
cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
if err != nil {
- return "", errors.WithContextf(err, "connecting to job service")
+ return presult, errors.WithContextf(err, "connecting to job
service")
}
defer cc.Close()
client := jobpb.NewJobServiceClient(cc)
prepID, artifactEndpoint, st, err := Prepare(ctx, client, p, opt)
if err != nil {
- return "", err
+ return presult, err
}
log.Infof(ctx, "Prepared job with id: %v and staging token: %v",
prepID, st)
@@ -58,7 +61,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline,
endpoint string, opt *JobO
worker, err := BuildTempWorkerBinary(ctx)
if err != nil {
- return "", err
+ return presult, err
}
defer os.Remove(worker)
@@ -70,7 +73,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline,
endpoint string, opt *JobO
token, err := Stage(ctx, prepID, artifactEndpoint, bin, st)
if err != nil {
- return "", err
+ return presult, err
}
log.Infof(ctx, "Staged binary artifact with token: %v", token)
@@ -79,7 +82,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline,
endpoint string, opt *JobO
jobID, err := Submit(ctx, client, prepID, token)
if err != nil {
- return "", err
+ return presult, err
}
log.Infof(ctx, "Submitted job: %v", jobID)
@@ -87,7 +90,36 @@ func Execute(ctx context.Context, p *pipepb.Pipeline,
endpoint string, opt *JobO
// (4) Wait for completion.
if async {
- return jobID, nil
+ return presult, nil
}
- return jobID, WaitForCompletion(ctx, client, jobID)
+ err = WaitForCompletion(ctx, client, jobID)
+
+ res, err := newUniversalPipelineResult(ctx, jobID, client)
+ if err != nil {
+ return presult, err
+ }
+ presult = res
+
+ return presult, err
+}
+
+type universalPipelineResult struct {
+ JobID string
+ metrics *metrics.Results
+}
+
+func newUniversalPipelineResult(ctx context.Context, jobID string, client
jobpb.JobServiceClient) (*universalPipelineResult, error) {
+ request := &jobpb.GetJobMetricsRequest{JobId: jobID}
+ response, err := client.GetJobMetrics(ctx, request)
+ if err != nil {
+ return &universalPipelineResult{jobID, nil}, errors.Wrap(err,
"failed to get metrics")
+ }
+
+ monitoredStates := response.GetMetrics()
+ metrics := metricsx.FromMonitoringInfos(monitoredStates.Attempted,
monitoredStates.Committed)
+ return &universalPipelineResult{jobID, metrics}, nil
+}
+
+func (pr universalPipelineResult) Metrics() metrics.Results {
+ return *pr.metrics
}
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go
b/sdks/go/pkg/beam/runners/universal/universal.go
index de5378c..3ad1a18 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -42,27 +42,27 @@ func init() {
}
// Execute executes the pipeline on a universal beam runner.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult,
error) {
if !beam.Initialized() {
panic(fmt.Sprint("Beam has not been initialized. Call
beam.Init() before pipeline construction."))
}
if *jobopts.Strict {
log.Info(ctx, "Strict mode enabled, applying additional
validation.")
- if err := vet.Execute(ctx, p); err != nil {
- return errors.Wrap(err, "strictness check failed")
+ if _, err := vet.Execute(ctx, p); err != nil {
+ return nil, errors.Wrap(err, "strictness check failed")
}
log.Info(ctx, "Strict mode validation passed.")
}
endpoint, err := jobopts.GetEndpoint()
if err != nil {
- return err
+ return nil, err
}
edges, _, err := p.Build()
if err != nil {
- return err
+ return nil, err
}
envUrn := jobopts.GetEnvironmentUrn(ctx)
getEnvCfg := jobopts.GetEnvironmentConfig
@@ -71,7 +71,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
// TODO(BEAM-10610): Allow user configuration of this port,
rather than kernel selected.
srv, err := extworker.StartLoopback(ctx, 0)
if err != nil {
- return err
+ return nil, err
}
defer srv.Stop(ctx)
getEnvCfg = srv.EnvironmentConfig
@@ -79,11 +79,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
enviroment, err := graphx.CreateEnvironment(ctx, envUrn, getEnvCfg)
if err != nil {
- return errors.WithContextf(err, "generating model pipeline")
+ return nil, errors.WithContextf(err, "generating model
pipeline")
}
pipeline, err := graphx.Marshal(edges, &graphx.Options{Environment:
enviroment})
if err != nil {
- return errors.WithContextf(err, "generating model pipeline")
+ return nil, errors.WithContextf(err, "generating model
pipeline")
}
// Fetch all dependencies for cross-language transforms
@@ -97,6 +97,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
Worker: *jobopts.WorkerBinary,
RetainDocker: *jobopts.RetainDockerContainers,
}
- _, err = runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async)
- return err
+ presult, err := runnerlib.Execute(ctx, pipeline, endpoint, opt,
*jobopts.Async)
+ return presult, err
}
diff --git a/sdks/go/pkg/beam/runners/vet/vet.go
b/sdks/go/pkg/beam/runners/vet/vet.go
index 4ce0e2f..268852a 100644
--- a/sdks/go/pkg/beam/runners/vet/vet.go
+++ b/sdks/go/pkg/beam/runners/vet/vet.go
@@ -58,10 +58,10 @@ func (p disabledResolver) Sym2Addr(name string) (uintptr,
error) {
}
// Execute evaluates the pipeline on whether it can run without reflection.
-func Execute(ctx context.Context, p *beam.Pipeline) error {
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult,
error) {
e, err := Evaluate(ctx, p)
if err != nil {
- return errors.WithContext(err, "validating pipeline with vet
runner")
+ return nil, errors.WithContext(err, "validating pipeline with
vet runner")
}
if !e.Performant() {
e.summary()
@@ -69,10 +69,10 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
e.diag("*/\n")
err := errors.Errorf("pipeline is not performant, see
diagnostic summary:\n%s\n%s", string(e.d.Bytes()), string(e.Bytes()))
err = errors.WithContext(err, "validating pipeline with vet
runner")
- return errors.SetTopLevelMsg(err, "pipeline is not performant")
+ return nil, errors.SetTopLevelMsg(err, "pipeline is not
performant")
}
// Pipeline nas no further tasks.
- return nil
+ return nil, nil
}
// Evaluate returns an object that can generate necessary shims and inits.
diff --git a/sdks/go/pkg/beam/testing/ptest/ptest.go
b/sdks/go/pkg/beam/testing/ptest/ptest.go
index fe23f6c..602303a 100644
--- a/sdks/go/pkg/beam/testing/ptest/ptest.go
+++ b/sdks/go/pkg/beam/testing/ptest/ptest.go
@@ -73,7 +73,8 @@ func Run(p *beam.Pipeline) error {
if *Runner == "" {
*Runner = defaultRunner
}
- return beam.Run(context.Background(), *Runner, p)
+ _, err := beam.Run(context.Background(), *Runner, p)
+ return err
}
// Main is an implementation of testing's TestMain to permit testing
diff --git a/sdks/go/pkg/beam/x/beamx/run.go b/sdks/go/pkg/beam/x/beamx/run.go
index 9a1d27a..77d766b 100644
--- a/sdks/go/pkg/beam/x/beamx/run.go
+++ b/sdks/go/pkg/beam/x/beamx/run.go
@@ -40,5 +40,13 @@ var runner = flag.String("runner", "direct", "Pipeline
runner.")
// defaults to the direct runner, but all beam-distributed runners and textio
// filesystems are implicitly registered.
func Run(ctx context.Context, p *beam.Pipeline) error {
+ _, err := beam.Run(ctx, *runner, p)
+ return err
+}
+
+// RunWithMetrics invokes beam.Run with the runner supplied by the
+// flag "runner". Returns a beam.PipelineResult objects, which can be
+// accessed to query the pipeline's metrics.
+func RunWithMetrics(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error) {
return beam.Run(ctx, *runner, p)
}