lostluck commented on a change in pull request #15923:
URL: https://github.com/apache/beam/pull/15923#discussion_r745926957



##########
File path: sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
##########
@@ -150,7 +183,7 @@ func extractGaugeValue(reader *bytes.Reader) 
(metrics.GaugeValue, error) {
 }
 
 func newLabels(miLabels map[string]string) *metrics.Labels {
-       labels := metrics.UserLabels(miLabels["PTRANSFORM"], 
miLabels["NAMESPACE"], miLabels["NAME"])
+       labels := metrics.UserLabels(miLabels["PTRANSFORM"], 
miLabels["NAMESPACE"], miLabels["NAME"], miLabels["PCOLLECTION"])

Review comment:
       Related to the comment about modifying UserLabels, consider using wether 
PCollection is populated to determine if `metrics.PCollectionLabels` is used 
instead.

##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -42,8 +42,8 @@ func (l Labels) Name() string { return l.name }
 
 // UserLabels builds a Labels for user metrics.
 // Intended for framework use.
-func UserLabels(transform, namespace, name string) Labels {
-       return Labels{transform: transform, namespace: namespace, name: name}
+func UserLabels(transform, namespace, name, pcollection string) Labels {

Review comment:
       Why modify this function instead of calling `PCollectionLabels` below, 
or making a new one?
   
   Note how there are significantly more uses where you've modified the 
existing code with a "" parameter, vs ones where you've populated the parameter.

##########
File path: sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
##########
@@ -28,25 +28,37 @@ import (
 
 // FromMonitoringInfos extracts metrics from monitored states and
 // groups them into counters, distributions and gauges.
-func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed 
[]*pipepb.MonitoringInfo) *metrics.Results {
-       ac, ad, ag, am := groupByType(attempted)
-       cc, cd, cg, cm := groupByType(committed)
+func FromMonitoringInfos(p *pipepb.Pipeline, attempted 
[]*pipepb.MonitoringInfo, committed []*pipepb.MonitoringInfo) *metrics.Results {
+       ac, ad, ag, am, ap := groupByType(p, attempted)
+       cc, cd, cg, cm, cp := groupByType(p, committed)
 
-       return metrics.NewResults(metrics.MergeCounters(ac, cc), 
metrics.MergeDistributions(ad, cd), metrics.MergeGauges(ag, cg), 
metrics.MergeMsecs(am, cm))
+       return metrics.NewResults(metrics.MergeCounters(ac, cc), 
metrics.MergeDistributions(ad, cd), metrics.MergeGauges(ag, cg), 
metrics.MergeMsecs(am, cm), metrics.MergePCols(ap, cp))
 }
 
-func groupByType(minfos []*pipepb.MonitoringInfo) (
+func groupByType(p *pipepb.Pipeline, minfos []*pipepb.MonitoringInfo) (
        map[metrics.StepKey]int64,
        map[metrics.StepKey]metrics.DistributionValue,
        map[metrics.StepKey]metrics.GaugeValue,
-       map[metrics.StepKey]metrics.MsecValue) {
+       map[metrics.StepKey]metrics.MsecValue,
+       map[metrics.StepKey]metrics.PColValue) {
        counters := make(map[metrics.StepKey]int64)
        distributions := make(map[metrics.StepKey]metrics.DistributionValue)
        gauges := make(map[metrics.StepKey]metrics.GaugeValue)
        msecs := make(map[metrics.StepKey]metrics.MsecValue)
+       pcols := make(map[metrics.StepKey]metrics.PColValue)
+
+       // extract pcol for a PTransform into a map from pipeline proto.
+       pcolTransform := make(map[string]string)
+
+       for _, transform := range p.GetComponents().GetTransforms() {
+               outputs := transform.GetOutputs()
+               for _, pid := range outputs {
+                       pcolTransform[pid] = transform.GetUniqueName()

Review comment:
       Unfortunately, we can't ignore the output key here. Multiple 
PCollections can be output from the same PTransform, and will have distinct 
stats, so we need to include the uniquifier.
   ```suggestion
                for o, pid := range outputs {
                        pcolTransform[pid] = fmt.Sprintf("%s.%s", 
transform.GetUniqueName(), o)
   ```
   
   This will prevent the earlier code from mis-merging PCollection stats too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to