This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ededcb5 [BEAM-11217] Metrics querying for Pcol metrics (#15923)
ededcb5 is described below
commit ededcb50149eab212605690211b90c9eeb747fcb
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Tue Nov 9 17:32:30 2021 -0500
[BEAM-11217] Metrics querying for Pcol metrics (#15923)
---
sdks/go/pkg/beam/core/metrics/metrics.go | 89 ++++++++++++++++++++--
sdks/go/pkg/beam/core/metrics/metrics_test.go | 37 +++++++++
sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go | 62 ++++++++++++---
.../beam/core/runtime/metricsx/metricsx_test.go | 9 ++-
.../beam/runners/dataflow/dataflowlib/metrics.go | 2 +-
.../beam/runners/universal/runnerlib/execute.go | 6 +-
6 files changed, 180 insertions(+), 25 deletions(-)
diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go
b/sdks/go/pkg/beam/core/metrics/metrics.go
index 2c636e2..0598c7e 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics.go
@@ -479,11 +479,17 @@ func (m *executionState) kind() kind {
return kindDoFnMsec
}
-// MsecValue is the value of a single msec metric
+// MsecValue is the value of a single msec metric.
type MsecValue struct {
Start, Process, Finish, Total time.Duration
}
+// PColValue is the value of a single PCollection metric.
+type PColValue struct {
+ ElementCount int64
+ SampledByteSize DistributionValue
+}
+
// Results represents all metrics gathered during the job's execution.
// It allows for querying metrics using a provided filter.
type Results struct {
@@ -491,6 +497,7 @@ type Results struct {
distributions []DistributionResult
gauges []GaugeResult
msecs []MsecResult
+ pCols []PColResult
}
// NewResults creates a new Results.
@@ -498,17 +505,16 @@ func NewResults(
counters []CounterResult,
distributions []DistributionResult,
gauges []GaugeResult,
- msecs []MsecResult) *Results {
- return &Results{counters, distributions, gauges, msecs}
+ msecs []MsecResult,
+ pCols []PColResult) *Results {
+ return &Results{counters, distributions, gauges, msecs, pCols}
}
// AllMetrics returns all metrics from a Results instance.
func (mr Results) AllMetrics() QueryResults {
- return QueryResults{mr.counters, mr.distributions, mr.gauges, mr.msecs}
+ return QueryResults{mr.counters, mr.distributions, mr.gauges, mr.msecs,
mr.pCols}
}
-// TODO(BEAM-11217): Implement querying metrics by DoFn
-
// SingleResult interface facilitates metrics query filtering methods.
type SingleResult interface {
Name() string
@@ -525,6 +531,7 @@ func (mr Results) Query(f func(SingleResult) bool)
QueryResults {
distributions := []DistributionResult{}
gauges := []GaugeResult{}
msecs := []MsecResult{}
+ pCols := []PColResult{}
for _, counter := range mr.counters {
if f(counter) {
@@ -546,7 +553,12 @@ func (mr Results) Query(f func(SingleResult) bool)
QueryResults {
msecs = append(msecs, msec)
}
}
- return QueryResults{counters: counters, distributions: distributions,
gauges: gauges, msecs: msecs}
+ for _, pCol := range mr.pCols {
+ if f(pCol) {
+ pCols = append(pCols, pCol)
+ }
+ }
+ return QueryResults{counters: counters, distributions: distributions,
gauges: gauges, msecs: msecs, pCols: pCols}
}
// QueryResults is the result of a query. Allows accessing all of the
@@ -556,6 +568,7 @@ type QueryResults struct {
distributions []DistributionResult
gauges []GaugeResult
msecs []MsecResult
+ pCols []PColResult
}
// Counters returns a slice of counter metrics.
@@ -586,6 +599,13 @@ func (qr QueryResults) Msecs() []MsecResult {
return out
}
+// PCols returns a slice of PCollection metrics.
+func (qr QueryResults) PCols() []PColResult {
+ out := make([]PColResult, len(qr.pCols))
+ copy(out, qr.pCols)
+ return out
+}
+
// CounterResult is an attempted and a commited value of a counter metric plus
// key.
type CounterResult struct {
@@ -725,6 +745,61 @@ func (r GaugeResult) Namespace() string {
// Transform returns the Transform step for this GaugeResult.
func (r GaugeResult) Transform() string { return r.Key.Step }
+// PColResult is an attempted and a commited value of a pcollection
+// metric plus key.
+type PColResult struct {
+ Attempted, Committed PColValue
+ Key StepKey
+}
+
+// Result returns committed metrics. Falls back to attempted metrics if
committed
+// are not populated (e.g. due to not being supported on a given runner).
+func (r PColResult) Result() PColValue {
+ empty := PColValue{}
+ if r.Committed != empty {
+ return r.Committed
+ }
+ return r.Attempted
+}
+
+// Name returns the Name of this Pcollection Result.
+func (r PColResult) Name() string {
+ return ""
+}
+
+// Namespace returns the Namespace of this Pcollection Result.
+func (r PColResult) Namespace() string {
+ return ""
+}
+
+// Transform returns the Transform step for this Pcollection Result.
+func (r PColResult) Transform() string { return r.Key.Step }
+
+// MergePCols combines pcollection metrics that share a common key.
+func MergePCols(
+ attempted map[StepKey]PColValue,
+ committed map[StepKey]PColValue) []PColResult {
+ res := make([]PColResult, 0)
+ merged := map[StepKey]PColResult{}
+
+ for k, v := range attempted {
+ merged[k] = PColResult{Attempted: v, Key: k}
+ }
+ for k, v := range committed {
+ m, ok := merged[k]
+ if ok {
+ merged[k] = PColResult{Attempted: m.Attempted,
Committed: v, Key: k}
+ } else {
+ merged[k] = PColResult{Committed: v, Key: k}
+ }
+ }
+
+ for _, v := range merged {
+ res = append(res, v)
+ }
+ return res
+}
+
// StepKey uniquely identifies a metric within a pipeline graph.
type StepKey struct {
Step, Name, Namespace string
diff --git a/sdks/go/pkg/beam/core/metrics/metrics_test.go
b/sdks/go/pkg/beam/core/metrics/metrics_test.go
index d05babd..ff3141d 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics_test.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics_test.go
@@ -486,6 +486,43 @@ func TestMsecQueryResult(t *testing.T) {
}
}
+func TestPcolQueryResult(t *testing.T) {
+ realKey := StepKey{Step: "sumFn"}
+ pcolA := PColValue{}
+ pcolB := PColValue{ElementCount: 1, SampledByteSize:
DistributionValue{1, 1, 1, 1}}
+ pcolR := PColResult{Attempted: pcolA, Committed: pcolB, Key: realKey}
+ res := Results{pCols: []PColResult{pcolR}}
+
+ tests := []struct {
+ name string
+ queryResult Results
+ query string
+ want QueryResults
+ }{
+ {
+ name: "present",
+ queryResult: res,
+ query: "sumFn",
+ want: QueryResults{pCols: []PColResult{pcolR}},
+ }, {
+ name: "not present",
+ queryResult: res,
+ query: "countFn",
+ want: QueryResults{},
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ got := res.Query(func(sr SingleResult) bool {
+ return strings.Contains(sr.Transform(),
test.query)
+ })
+ if len(got.PCols()) != len(test.want.PCols()) {
+ t.Errorf("(Results).Query(by Transform %v) =
%v, want = %v", test.query, got.PCols(), test.want.PCols())
+ }
+ })
+ }
+}
+
// Run on @lostluck's desktop (2020/01/21) go1.13.4
//
// Allocs & bytes should be consistent within go versions, but ns/op is
relative to the running machine.
diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
index 46396e1..dcf88b0 100644
--- a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
@@ -28,25 +28,37 @@ import (
// FromMonitoringInfos extracts metrics from monitored states and
// groups them into counters, distributions and gauges.
-func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed
[]*pipepb.MonitoringInfo) *metrics.Results {
- ac, ad, ag, am := groupByType(attempted)
- cc, cd, cg, cm := groupByType(committed)
+func FromMonitoringInfos(p *pipepb.Pipeline, attempted
[]*pipepb.MonitoringInfo, committed []*pipepb.MonitoringInfo) *metrics.Results {
+ ac, ad, ag, am, ap := groupByType(p, attempted)
+ cc, cd, cg, cm, cp := groupByType(p, committed)
- return metrics.NewResults(metrics.MergeCounters(ac, cc),
metrics.MergeDistributions(ad, cd), metrics.MergeGauges(ag, cg),
metrics.MergeMsecs(am, cm))
+ return metrics.NewResults(metrics.MergeCounters(ac, cc),
metrics.MergeDistributions(ad, cd), metrics.MergeGauges(ag, cg),
metrics.MergeMsecs(am, cm), metrics.MergePCols(ap, cp))
}
-func groupByType(minfos []*pipepb.MonitoringInfo) (
+func groupByType(p *pipepb.Pipeline, minfos []*pipepb.MonitoringInfo) (
map[metrics.StepKey]int64,
map[metrics.StepKey]metrics.DistributionValue,
map[metrics.StepKey]metrics.GaugeValue,
- map[metrics.StepKey]metrics.MsecValue) {
+ map[metrics.StepKey]metrics.MsecValue,
+ map[metrics.StepKey]metrics.PColValue) {
counters := make(map[metrics.StepKey]int64)
distributions := make(map[metrics.StepKey]metrics.DistributionValue)
gauges := make(map[metrics.StepKey]metrics.GaugeValue)
msecs := make(map[metrics.StepKey]metrics.MsecValue)
+ pcols := make(map[metrics.StepKey]metrics.PColValue)
+
+ // extract pcol for a PTransform into a map from pipeline proto.
+ pcolToTransform := make(map[string]string)
+
+ for _, transform := range p.GetComponents().GetTransforms() {
+ outputs := transform.GetOutputs()
+ for o, pid := range outputs {
+ pcolToTransform[pid] = fmt.Sprintf("%s.%s",
transform.GetUniqueName(), o)
+ }
+ }
for _, minfo := range minfos {
- key, err := extractKey(minfo)
+ key, err := extractKey(minfo, pcolToTransform)
if err != nil {
log.Println(err)
continue
@@ -100,17 +112,38 @@ func groupByType(minfos []*pipepb.MonitoringInfo) (
v.Total = value
}
msecs[key] = v
+ case UrnToString(UrnElementCount):
+ value, err := extractCounterValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ v := pcols[key]
+ v.ElementCount = value
+ pcols[key] = v
+ case UrnToString(UrnSampledByteSize):
+ value, err := extractDistributionValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ v := pcols[key]
+ v.SampledByteSize = value
+ pcols[key] = v
default:
log.Println("unknown metric type")
}
}
- return counters, distributions, gauges, msecs
+ return counters, distributions, gauges, msecs, pcols
}
-func extractKey(mi *pipepb.MonitoringInfo) (metrics.StepKey, error) {
+func extractKey(mi *pipepb.MonitoringInfo, pcolToTransform map[string]string)
(metrics.StepKey, error) {
labels := newLabels(mi.GetLabels())
stepName := labels.Transform()
+ if v, ok := pcolToTransform[labels.PCollection()]; ok {
+ stepName = v
+ }
if stepName == "" {
return metrics.StepKey{}, fmt.Errorf("Failed to deduce Step
from MonitoringInfo: %v", mi)
}
@@ -150,8 +183,15 @@ func extractGaugeValue(reader *bytes.Reader)
(metrics.GaugeValue, error) {
}
func newLabels(miLabels map[string]string) *metrics.Labels {
- labels := metrics.UserLabels(miLabels["PTRANSFORM"],
miLabels["NAMESPACE"], miLabels["NAME"])
- return &labels
+ if miLabels["PTRANSFORM"] != "" {
+ labels := metrics.UserLabels(miLabels["PTRANSFORM"],
miLabels["NAMESPACE"], miLabels["NAME"])
+ return &labels
+ }
+ if miLabels["PCOLLECTION"] != "" {
+ labels := metrics.PCollectionLabels(miLabels["PCOLLECTION"])
+ return &labels
+ }
+ return &metrics.Labels{}
}
func decodeMany(reader *bytes.Reader, size int) ([]int64, error) {
diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
index d5406da..20492ab 100644
--- a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
@@ -55,8 +55,9 @@ func TestFromMonitoringInfos_Counters(t *testing.T) {
attempted := []*pipepb.MonitoringInfo{mInfo}
committed := []*pipepb.MonitoringInfo{}
+ p := &pipepb.Pipeline{}
- got := FromMonitoringInfos(attempted, committed).AllMetrics().Counters()
+ got := FromMonitoringInfos(p, attempted,
committed).AllMetrics().Counters()
size := len(got)
if size < 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
@@ -104,8 +105,9 @@ func TestFromMonitoringInfos_Distributions(t *testing.T) {
attempted := []*pipepb.MonitoringInfo{mInfo}
committed := []*pipepb.MonitoringInfo{}
+ p := &pipepb.Pipeline{}
- got := FromMonitoringInfos(attempted,
committed).AllMetrics().Distributions()
+ got := FromMonitoringInfos(p, attempted,
committed).AllMetrics().Distributions()
size := len(got)
if size < 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
@@ -153,8 +155,9 @@ func TestFromMonitoringInfos_Gauges(t *testing.T) {
attempted := []*pipepb.MonitoringInfo{mInfo}
committed := []*pipepb.MonitoringInfo{}
+ p := &pipepb.Pipeline{}
- got := FromMonitoringInfos(attempted, committed).AllMetrics().Gauges()
+ got := FromMonitoringInfos(p, attempted,
committed).AllMetrics().Gauges()
size := len(got)
if size < 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go
index b17296f..f4ccf5f 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go
@@ -33,7 +33,7 @@ func FromMetricUpdates(allMetrics []*df.MetricUpdate, p
*pipepb.Pipeline) *metri
ac, ad := groupByType(allMetrics, p, true)
cc, cd := groupByType(allMetrics, p, false)
- return metrics.NewResults(metrics.MergeCounters(ac, cc),
metrics.MergeDistributions(ad, cd), make([]metrics.GaugeResult, 0),
make([]metrics.MsecResult, 0))
+ return metrics.NewResults(metrics.MergeCounters(ac, cc),
metrics.MergeDistributions(ad, cd), make([]metrics.GaugeResult, 0),
make([]metrics.MsecResult, 0), make([]metrics.PColResult, 0))
}
func groupByType(allMetrics []*df.MetricUpdate, p *pipepb.Pipeline, tentative
bool) (
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
index 2d7672a..c7371db 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
@@ -94,7 +94,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline,
endpoint string, opt *JobO
}
err = WaitForCompletion(ctx, client, jobID)
- res, presultErr := newUniversalPipelineResult(ctx, jobID, client)
+ res, presultErr := newUniversalPipelineResult(ctx, jobID, client, p)
if presultErr != nil {
if err != nil {
return presult, errors.Wrap(err, presultErr.Error())
@@ -109,7 +109,7 @@ type universalPipelineResult struct {
metrics *metrics.Results
}
-func newUniversalPipelineResult(ctx context.Context, jobID string, client
jobpb.JobServiceClient) (*universalPipelineResult, error) {
+func newUniversalPipelineResult(ctx context.Context, jobID string, client
jobpb.JobServiceClient, p *pipepb.Pipeline) (*universalPipelineResult, error) {
request := &jobpb.GetJobMetricsRequest{JobId: jobID}
response, err := client.GetJobMetrics(ctx, request)
if err != nil {
@@ -117,7 +117,7 @@ func newUniversalPipelineResult(ctx context.Context, jobID
string, client jobpb.
}
monitoredStates := response.GetMetrics()
- metrics := metricsx.FromMonitoringInfos(monitoredStates.Attempted,
monitoredStates.Committed)
+ metrics := metricsx.FromMonitoringInfos(p, monitoredStates.Attempted,
monitoredStates.Committed)
return &universalPipelineResult{jobID, metrics}, nil
}