This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ededcb5  [BEAM-11217] Metrics querying for Pcol metrics (#15923)
ededcb5 is described below

commit ededcb50149eab212605690211b90c9eeb747fcb
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Tue Nov 9 17:32:30 2021 -0500

    [BEAM-11217] Metrics querying for Pcol metrics (#15923)
---
 sdks/go/pkg/beam/core/metrics/metrics.go           | 89 ++++++++++++++++++++--
 sdks/go/pkg/beam/core/metrics/metrics_test.go      | 37 +++++++++
 sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go | 62 ++++++++++++---
 .../beam/core/runtime/metricsx/metricsx_test.go    |  9 ++-
 .../beam/runners/dataflow/dataflowlib/metrics.go   |  2 +-
 .../beam/runners/universal/runnerlib/execute.go    |  6 +-
 6 files changed, 180 insertions(+), 25 deletions(-)

diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go 
b/sdks/go/pkg/beam/core/metrics/metrics.go
index 2c636e2..0598c7e 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics.go
@@ -479,11 +479,17 @@ func (m *executionState) kind() kind {
        return kindDoFnMsec
 }
 
-// MsecValue is the value of a single msec metric
+// MsecValue is the value of a single msec metric.
 type MsecValue struct {
        Start, Process, Finish, Total time.Duration
 }
 
+// PColValue is the value of a single PCollection metric.
+type PColValue struct {
+       ElementCount    int64
+       SampledByteSize DistributionValue
+}
+
 // Results represents all metrics gathered during the job's execution.
 // It allows for querying metrics using a provided filter.
 type Results struct {
@@ -491,6 +497,7 @@ type Results struct {
        distributions []DistributionResult
        gauges        []GaugeResult
        msecs         []MsecResult
+       pCols         []PColResult
 }
 
 // NewResults creates a new Results.
@@ -498,17 +505,16 @@ func NewResults(
        counters []CounterResult,
        distributions []DistributionResult,
        gauges []GaugeResult,
-       msecs []MsecResult) *Results {
-       return &Results{counters, distributions, gauges, msecs}
+       msecs []MsecResult,
+       pCols []PColResult) *Results {
+       return &Results{counters, distributions, gauges, msecs, pCols}
 }
 
 // AllMetrics returns all metrics from a Results instance.
 func (mr Results) AllMetrics() QueryResults {
-       return QueryResults{mr.counters, mr.distributions, mr.gauges, mr.msecs}
+       return QueryResults{mr.counters, mr.distributions, mr.gauges, mr.msecs, 
mr.pCols}
 }
 
-// TODO(BEAM-11217): Implement querying metrics by DoFn
-
 // SingleResult interface facilitates metrics query filtering methods.
 type SingleResult interface {
        Name() string
@@ -525,6 +531,7 @@ func (mr Results) Query(f func(SingleResult) bool) 
QueryResults {
        distributions := []DistributionResult{}
        gauges := []GaugeResult{}
        msecs := []MsecResult{}
+       pCols := []PColResult{}
 
        for _, counter := range mr.counters {
                if f(counter) {
@@ -546,7 +553,12 @@ func (mr Results) Query(f func(SingleResult) bool) 
QueryResults {
                        msecs = append(msecs, msec)
                }
        }
-       return QueryResults{counters: counters, distributions: distributions, 
gauges: gauges, msecs: msecs}
+       for _, pCol := range mr.pCols {
+               if f(pCol) {
+                       pCols = append(pCols, pCol)
+               }
+       }
+       return QueryResults{counters: counters, distributions: distributions, 
gauges: gauges, msecs: msecs, pCols: pCols}
 }
 
 // QueryResults is the result of a query. Allows accessing all of the
@@ -556,6 +568,7 @@ type QueryResults struct {
        distributions []DistributionResult
        gauges        []GaugeResult
        msecs         []MsecResult
+       pCols         []PColResult
 }
 
 // Counters returns a slice of counter metrics.
@@ -586,6 +599,13 @@ func (qr QueryResults) Msecs() []MsecResult {
        return out
 }
 
+// PCols returns a slice of PCollection metrics.
+func (qr QueryResults) PCols() []PColResult {
+       out := make([]PColResult, len(qr.pCols))
+       copy(out, qr.pCols)
+       return out
+}
+
 // CounterResult is an attempted and a commited value of a counter metric plus
 // key.
 type CounterResult struct {
@@ -725,6 +745,61 @@ func (r GaugeResult) Namespace() string {
 // Transform returns the Transform step for this GaugeResult.
 func (r GaugeResult) Transform() string { return r.Key.Step }
 
+// PColResult is an attempted and a commited value of a pcollection
+// metric plus key.
+type PColResult struct {
+       Attempted, Committed PColValue
+       Key                  StepKey
+}
+
+// Result returns committed metrics. Falls back to attempted metrics if 
committed
+// are not populated (e.g. due to not being supported on a given runner).
+func (r PColResult) Result() PColValue {
+       empty := PColValue{}
+       if r.Committed != empty {
+               return r.Committed
+       }
+       return r.Attempted
+}
+
+// Name returns the Name of this Pcollection Result.
+func (r PColResult) Name() string {
+       return ""
+}
+
+// Namespace returns the Namespace of this Pcollection Result.
+func (r PColResult) Namespace() string {
+       return ""
+}
+
+// Transform returns the Transform step for this Pcollection Result.
+func (r PColResult) Transform() string { return r.Key.Step }
+
+// MergePCols combines pcollection metrics that share a common key.
+func MergePCols(
+       attempted map[StepKey]PColValue,
+       committed map[StepKey]PColValue) []PColResult {
+       res := make([]PColResult, 0)
+       merged := map[StepKey]PColResult{}
+
+       for k, v := range attempted {
+               merged[k] = PColResult{Attempted: v, Key: k}
+       }
+       for k, v := range committed {
+               m, ok := merged[k]
+               if ok {
+                       merged[k] = PColResult{Attempted: m.Attempted, 
Committed: v, Key: k}
+               } else {
+                       merged[k] = PColResult{Committed: v, Key: k}
+               }
+       }
+
+       for _, v := range merged {
+               res = append(res, v)
+       }
+       return res
+}
+
 // StepKey uniquely identifies a metric within a pipeline graph.
 type StepKey struct {
        Step, Name, Namespace string
diff --git a/sdks/go/pkg/beam/core/metrics/metrics_test.go 
b/sdks/go/pkg/beam/core/metrics/metrics_test.go
index d05babd..ff3141d 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics_test.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics_test.go
@@ -486,6 +486,43 @@ func TestMsecQueryResult(t *testing.T) {
        }
 }
 
+func TestPcolQueryResult(t *testing.T) {
+       realKey := StepKey{Step: "sumFn"}
+       pcolA := PColValue{}
+       pcolB := PColValue{ElementCount: 1, SampledByteSize: 
DistributionValue{1, 1, 1, 1}}
+       pcolR := PColResult{Attempted: pcolA, Committed: pcolB, Key: realKey}
+       res := Results{pCols: []PColResult{pcolR}}
+
+       tests := []struct {
+               name        string
+               queryResult Results
+               query       string
+               want        QueryResults
+       }{
+               {
+                       name:        "present",
+                       queryResult: res,
+                       query:       "sumFn",
+                       want:        QueryResults{pCols: []PColResult{pcolR}},
+               }, {
+                       name:        "not present",
+                       queryResult: res,
+                       query:       "countFn",
+                       want:        QueryResults{},
+               },
+       }
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       got := res.Query(func(sr SingleResult) bool {
+                               return strings.Contains(sr.Transform(), 
test.query)
+                       })
+                       if len(got.PCols()) != len(test.want.PCols()) {
+                               t.Errorf("(Results).Query(by Transform %v) = 
%v, want = %v", test.query, got.PCols(), test.want.PCols())
+                       }
+               })
+       }
+}
+
 // Run on @lostluck's desktop (2020/01/21) go1.13.4
 //
 // Allocs & bytes should be consistent within go versions, but ns/op is 
relative to the running machine.
diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go 
b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
index 46396e1..dcf88b0 100644
--- a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
@@ -28,25 +28,37 @@ import (
 
 // FromMonitoringInfos extracts metrics from monitored states and
 // groups them into counters, distributions and gauges.
-func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed 
[]*pipepb.MonitoringInfo) *metrics.Results {
-       ac, ad, ag, am := groupByType(attempted)
-       cc, cd, cg, cm := groupByType(committed)
+func FromMonitoringInfos(p *pipepb.Pipeline, attempted 
[]*pipepb.MonitoringInfo, committed []*pipepb.MonitoringInfo) *metrics.Results {
+       ac, ad, ag, am, ap := groupByType(p, attempted)
+       cc, cd, cg, cm, cp := groupByType(p, committed)
 
-       return metrics.NewResults(metrics.MergeCounters(ac, cc), 
metrics.MergeDistributions(ad, cd), metrics.MergeGauges(ag, cg), 
metrics.MergeMsecs(am, cm))
+       return metrics.NewResults(metrics.MergeCounters(ac, cc), 
metrics.MergeDistributions(ad, cd), metrics.MergeGauges(ag, cg), 
metrics.MergeMsecs(am, cm), metrics.MergePCols(ap, cp))
 }
 
-func groupByType(minfos []*pipepb.MonitoringInfo) (
+func groupByType(p *pipepb.Pipeline, minfos []*pipepb.MonitoringInfo) (
        map[metrics.StepKey]int64,
        map[metrics.StepKey]metrics.DistributionValue,
        map[metrics.StepKey]metrics.GaugeValue,
-       map[metrics.StepKey]metrics.MsecValue) {
+       map[metrics.StepKey]metrics.MsecValue,
+       map[metrics.StepKey]metrics.PColValue) {
        counters := make(map[metrics.StepKey]int64)
        distributions := make(map[metrics.StepKey]metrics.DistributionValue)
        gauges := make(map[metrics.StepKey]metrics.GaugeValue)
        msecs := make(map[metrics.StepKey]metrics.MsecValue)
+       pcols := make(map[metrics.StepKey]metrics.PColValue)
+
+       // extract pcol for a PTransform into a map from pipeline proto.
+       pcolToTransform := make(map[string]string)
+
+       for _, transform := range p.GetComponents().GetTransforms() {
+               outputs := transform.GetOutputs()
+               for o, pid := range outputs {
+                       pcolToTransform[pid] = fmt.Sprintf("%s.%s", 
transform.GetUniqueName(), o)
+               }
+       }
 
        for _, minfo := range minfos {
-               key, err := extractKey(minfo)
+               key, err := extractKey(minfo, pcolToTransform)
                if err != nil {
                        log.Println(err)
                        continue
@@ -100,17 +112,38 @@ func groupByType(minfos []*pipepb.MonitoringInfo) (
                                v.Total = value
                        }
                        msecs[key] = v
+               case UrnToString(UrnElementCount):
+                       value, err := extractCounterValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       v := pcols[key]
+                       v.ElementCount = value
+                       pcols[key] = v
+               case UrnToString(UrnSampledByteSize):
+                       value, err := extractDistributionValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       v := pcols[key]
+                       v.SampledByteSize = value
+                       pcols[key] = v
                default:
                        log.Println("unknown metric type")
                }
        }
-       return counters, distributions, gauges, msecs
+       return counters, distributions, gauges, msecs, pcols
 }
 
-func extractKey(mi *pipepb.MonitoringInfo) (metrics.StepKey, error) {
+func extractKey(mi *pipepb.MonitoringInfo, pcolToTransform map[string]string) 
(metrics.StepKey, error) {
        labels := newLabels(mi.GetLabels())
        stepName := labels.Transform()
 
+       if v, ok := pcolToTransform[labels.PCollection()]; ok {
+               stepName = v
+       }
        if stepName == "" {
                return metrics.StepKey{}, fmt.Errorf("Failed to deduce Step 
from MonitoringInfo: %v", mi)
        }
@@ -150,8 +183,15 @@ func extractGaugeValue(reader *bytes.Reader) 
(metrics.GaugeValue, error) {
 }
 
 func newLabels(miLabels map[string]string) *metrics.Labels {
-       labels := metrics.UserLabels(miLabels["PTRANSFORM"], 
miLabels["NAMESPACE"], miLabels["NAME"])
-       return &labels
+       if miLabels["PTRANSFORM"] != "" {
+               labels := metrics.UserLabels(miLabels["PTRANSFORM"], 
miLabels["NAMESPACE"], miLabels["NAME"])
+               return &labels
+       }
+       if miLabels["PCOLLECTION"] != "" {
+               labels := metrics.PCollectionLabels(miLabels["PCOLLECTION"])
+               return &labels
+       }
+       return &metrics.Labels{}
 }
 
 func decodeMany(reader *bytes.Reader, size int) ([]int64, error) {
diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go 
b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
index d5406da..20492ab 100644
--- a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
@@ -55,8 +55,9 @@ func TestFromMonitoringInfos_Counters(t *testing.T) {
 
        attempted := []*pipepb.MonitoringInfo{mInfo}
        committed := []*pipepb.MonitoringInfo{}
+       p := &pipepb.Pipeline{}
 
-       got := FromMonitoringInfos(attempted, committed).AllMetrics().Counters()
+       got := FromMonitoringInfos(p, attempted, 
committed).AllMetrics().Counters()
        size := len(got)
        if size < 1 {
                t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
@@ -104,8 +105,9 @@ func TestFromMonitoringInfos_Distributions(t *testing.T) {
 
        attempted := []*pipepb.MonitoringInfo{mInfo}
        committed := []*pipepb.MonitoringInfo{}
+       p := &pipepb.Pipeline{}
 
-       got := FromMonitoringInfos(attempted, 
committed).AllMetrics().Distributions()
+       got := FromMonitoringInfos(p, attempted, 
committed).AllMetrics().Distributions()
        size := len(got)
        if size < 1 {
                t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
@@ -153,8 +155,9 @@ func TestFromMonitoringInfos_Gauges(t *testing.T) {
 
        attempted := []*pipepb.MonitoringInfo{mInfo}
        committed := []*pipepb.MonitoringInfo{}
+       p := &pipepb.Pipeline{}
 
-       got := FromMonitoringInfos(attempted, committed).AllMetrics().Gauges()
+       got := FromMonitoringInfos(p, attempted, 
committed).AllMetrics().Gauges()
        size := len(got)
        if size < 1 {
                t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go
index b17296f..f4ccf5f 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/metrics.go
@@ -33,7 +33,7 @@ func FromMetricUpdates(allMetrics []*df.MetricUpdate, p 
*pipepb.Pipeline) *metri
        ac, ad := groupByType(allMetrics, p, true)
        cc, cd := groupByType(allMetrics, p, false)
 
-       return metrics.NewResults(metrics.MergeCounters(ac, cc), 
metrics.MergeDistributions(ad, cd), make([]metrics.GaugeResult, 0), 
make([]metrics.MsecResult, 0))
+       return metrics.NewResults(metrics.MergeCounters(ac, cc), 
metrics.MergeDistributions(ad, cd), make([]metrics.GaugeResult, 0), 
make([]metrics.MsecResult, 0), make([]metrics.PColResult, 0))
 }
 
 func groupByType(allMetrics []*df.MetricUpdate, p *pipepb.Pipeline, tentative 
bool) (
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
index 2d7672a..c7371db 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
@@ -94,7 +94,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, 
endpoint string, opt *JobO
        }
        err = WaitForCompletion(ctx, client, jobID)
 
-       res, presultErr := newUniversalPipelineResult(ctx, jobID, client)
+       res, presultErr := newUniversalPipelineResult(ctx, jobID, client, p)
        if presultErr != nil {
                if err != nil {
                        return presult, errors.Wrap(err, presultErr.Error())
@@ -109,7 +109,7 @@ type universalPipelineResult struct {
        metrics *metrics.Results
 }
 
-func newUniversalPipelineResult(ctx context.Context, jobID string, client 
jobpb.JobServiceClient) (*universalPipelineResult, error) {
+func newUniversalPipelineResult(ctx context.Context, jobID string, client 
jobpb.JobServiceClient, p *pipepb.Pipeline) (*universalPipelineResult, error) {
        request := &jobpb.GetJobMetricsRequest{JobId: jobID}
        response, err := client.GetJobMetrics(ctx, request)
        if err != nil {
@@ -117,7 +117,7 @@ func newUniversalPipelineResult(ctx context.Context, jobID 
string, client jobpb.
        }
 
        monitoredStates := response.GetMetrics()
-       metrics := metricsx.FromMonitoringInfos(monitoredStates.Attempted, 
monitoredStates.Committed)
+       metrics := metricsx.FromMonitoringInfos(p, monitoredStates.Attempted, 
monitoredStates.Committed)
        return &universalPipelineResult{jobID, metrics}, nil
 }
 

Reply via email to