lostluck commented on a change in pull request #15887:
URL: https://github.com/apache/beam/pull/15887#discussion_r743256891



##########
File path: sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
##########
@@ -126,12 +151,12 @@ func extractCounterValue(reader *bytes.Reader) (int64, 
error) {
        return value, nil
 }
 
-func extractMsecValue(reader *bytes.Reader) (metrics.MsecValue, error) {
+func extractMsecValue(reader *bytes.Reader) (time.Duration, error) {
        value, err := coder.DecodeVarInt(reader)
        if err != nil {
-               return metrics.MsecValue{}, err
+               return time.Millisecond, err

Review comment:
       You can simply return `0` here. The compiler will turn the untyped 
integer constant to `time.Duration` for you.

##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -754,6 +770,19 @@ func (r MsecResult) Result() MsecValue {
        return r.Attempted
 }
 
+// Name returns the Name of this Gauge.
+func (r MsecResult) Name() string {
+       return r.Key.Name
+}
+
+// Namespace returns the Namespace of this Gauge.
+func (r MsecResult) Namespace() string {
+       return r.Key.Namespace
+}

Review comment:
       Msec results can't have any name or namespaces (or at the very least, 
won't have them correctly populated here).  return `""` instead for now.
   
   Also, please fix the copypasta "Gauge" references.

##########
File path: sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
##########
@@ -78,17 +78,54 @@ func groupByType(minfos []*pipepb.MonitoringInfo) (
                                continue
                        }
                        gauges[key] = value
-               case
-                       UrnToString(UrnStartBundle),
-                       UrnToString(UrnProcessBundle),
-                       UrnToString(UrnFinishBundle),
-                       UrnToString(UrnTransformTotalTime):
+               case UrnToString(UrnStartBundle):
                        value, err := extractMsecValue(r)
                        if err != nil {
                                log.Println(err)
                                continue
                        }
-                       msecs[key] = value
+                       if v, ok := msecs[key]; ok {
+                               v.Start = value
+                               msecs[key] = v
+                       } else {
+                               msecs[key] = metrics.MsecValue{Start: value}
+                       }
+               case UrnToString(UrnProcessBundle):
+                       value, err := extractMsecValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       if v, ok := msecs[key]; ok {
+                               v.Process = value
+                               msecs[key] = v
+                       } else {
+                               msecs[key] = metrics.MsecValue{Process: value}
+                       }
+               case UrnToString(UrnFinishBundle):
+                       value, err := extractMsecValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       if v, ok := msecs[key]; ok {
+                               v.Finish = value
+                               msecs[key] = v
+                       } else {
+                               msecs[key] = metrics.MsecValue{Finish: value}
+                       }
+               case UrnToString(UrnTransformTotalTime):
+                       value, err := extractMsecValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       if v, ok := msecs[key]; ok {
+                               v.Total = value
+                               msecs[key] = v
+                       } else {
+                               msecs[key] = metrics.MsecValue{Total: value}
+                       }

Review comment:
       Instead of duplicating the code out, consider instead adding a single 
inner switch statement to get to the right field to set or re-set. There's no 
need for all this boilerplate repetition.
   eg.
   1. get or create value. (note, that map values, if they aren't in a map, are 
already a 0 value.
   2. Switch statement to update the field appropriately.
   3. set the entry in the map to the updated value.
   //

##########
File path: sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
##########
@@ -78,17 +78,54 @@ func groupByType(minfos []*pipepb.MonitoringInfo) (
                                continue
                        }
                        gauges[key] = value
-               case
-                       UrnToString(UrnStartBundle),
-                       UrnToString(UrnProcessBundle),
-                       UrnToString(UrnFinishBundle),
-                       UrnToString(UrnTransformTotalTime):
+               case UrnToString(UrnStartBundle):
                        value, err := extractMsecValue(r)
                        if err != nil {
                                log.Println(err)
                                continue
                        }
-                       msecs[key] = value
+                       if v, ok := msecs[key]; ok {
+                               v.Start = value
+                               msecs[key] = v
+                       } else {
+                               msecs[key] = metrics.MsecValue{Start: value}
+                       }
+               case UrnToString(UrnProcessBundle):
+                       value, err := extractMsecValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       if v, ok := msecs[key]; ok {
+                               v.Process = value
+                               msecs[key] = v
+                       } else {
+                               msecs[key] = metrics.MsecValue{Process: value}
+                       }
+               case UrnToString(UrnFinishBundle):
+                       value, err := extractMsecValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       if v, ok := msecs[key]; ok {
+                               v.Finish = value
+                               msecs[key] = v
+                       } else {
+                               msecs[key] = metrics.MsecValue{Finish: value}
+                       }
+               case UrnToString(UrnTransformTotalTime):
+                       value, err := extractMsecValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       if v, ok := msecs[key]; ok {
+                               v.Total = value
+                               msecs[key] = v
+                       } else {
+                               msecs[key] = metrics.MsecValue{Total: value}
+                       }

Review comment:
       Instead of duplicating the code out, consider instead adding a single 
inner switch statement to get to the right field to set or re-set. There's no 
need for all this boilerplate repetition.
   eg.
   1. get or create value. (note, that map values, if they aren't in a map, are 
already a 0 value.
   2. Switch statement to update the field appropriately.
   3. set the entry in the map to the updated value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to