lostluck commented on a change in pull request #13272:
URL: https://github.com/apache/beam/pull/13272#discussion_r521694702



##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64) 
([]byte, error) {
        }
        return buf.Bytes(), nil
 }
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed 
[]*pipepb.MonitoringInfo) (
+       []metrics.CounterResult,
+       []metrics.DistributionResult,
+       []metrics.GaugeResult) {
+       ac, ad, ag := groupByType(attempted)
+       cc, cd, cg := groupByType(committed)
+
+       c := mergeCounters(ac, cc)
+       d := mergeDistributions(ad, cd)
+       g := mergeGauges(ag, cg)
+
+       return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+       return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+       return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+       switch mi.GetType() {
+       case
+               "beam:metrics:latest_int64:v1",
+               "beam:metrics:top_n_int64:v1",
+               "beam:metrics:bottom_n_int64:v1":
+               return true
+       }
+       return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+       map[metrics.MetricKey]int64,
+       map[metrics.MetricKey]metrics.DistributionValue,
+       map[metrics.MetricKey]metrics.GaugeValue) {
+       counters := make(map[metrics.MetricKey]int64)
+       distributions := make(map[metrics.MetricKey]metrics.DistributionValue)
+       gauges := make(map[metrics.MetricKey]metrics.GaugeValue)
+
+       for _, minfo := range minfos {
+               key, err := extractKey(minfo)
+               if err != nil {
+                       log.Println(err)
+                       continue
+               }
+
+               r := bytes.NewReader(minfo.GetPayload())
+
+               if IsCounter(minfo) {
+                       value, err := extractCounterValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       counters[key] = value
+               } else if IsDistribution(minfo) {
+                       value, err := extractDistributionValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       distributions[key] = value
+               } else if IsGauge(minfo) {
+                       value, err := extractGaugeValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       gauges[key] = value
+               } else {
+                       log.Println("unknown metric type")
+               }
+       }
+       return counters, distributions, gauges
+}
+
+func mergeCounters(
+       attempted map[metrics.MetricKey]int64,
+       committed map[metrics.MetricKey]int64) []metrics.CounterResult {
+       res := make([]metrics.CounterResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = -1
+               }
+               res = append(res, metrics.CounterResult{Attempted: 
attempted[k], Committed: v, Key: k})
+       }
+       return res
+}
+
+func mergeDistributions(
+       attempted map[metrics.MetricKey]metrics.DistributionValue,
+       committed map[metrics.MetricKey]metrics.DistributionValue) 
[]metrics.DistributionResult {
+       res := make([]metrics.DistributionResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = metrics.DistributionValue{}
+               }
+               res = append(res, metrics.DistributionResult{Attempted: 
attempted[k], Committed: v, Key: k})
+       }
+       return res
+}
+
+func mergeGauges(
+       attempted map[metrics.MetricKey]metrics.GaugeValue,
+       committed map[metrics.MetricKey]metrics.GaugeValue) 
[]metrics.GaugeResult {
+       res := make([]metrics.GaugeResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = metrics.GaugeValue{}
+               }
+               res = append(res, metrics.GaugeResult{Attempted: attempted[k], 
Committed: v, Key: k})
+       }
+       return res
+}
+
+func extractKey(mi *pipepb.MonitoringInfo) (metrics.MetricKey, error) {
+       labels := newLabels(mi.GetLabels())
+       stepName := getStepName(labels)
+       if stepName == "" {
+               return metrics.MetricKey{}, fmt.Errorf("Failed to deduce Step 
from MonitoringInfo: %v", mi)
+       }
+       return metrics.MetricKey{Step: stepName, Name: labels.Name(), 
Namespace: labels.Namespace()}, nil
+}
+
+func extractCounterValue(reader *bytes.Reader) (int64, error) {
+       value, err := coder.DecodeVarInt(reader)
+       if err != nil {
+               return -1, err
+       }
+       return value, nil
+}
+
+func extractDistributionValue(reader *bytes.Reader) 
(metrics.DistributionValue, error) {
+       values, err := decodeMany(reader, 4)
+       if err != nil {
+               return metrics.DistributionValue{}, err
+       }
+       return metrics.DistributionValue{Count: values[0], Sum: values[1], Min: 
values[2], Max: values[3]}, nil
+}
+
+func extractGaugeValue(reader *bytes.Reader) (metrics.GaugeValue, error) {
+       values, err := decodeMany(reader, 2)
+       if err != nil {
+               return metrics.GaugeValue{}, err
+       }
+       return metrics.GaugeValue{Timestamp: time.Unix(0, 
values[0]*int64(time.Millisecond)), Value: values[1]}, nil
+}
+
+func newLabels(miLabels map[string]string) *metrics.Labels {
+       labels := metrics.UserLabels(
+               miLabels["PTRANSFORM"],
+               miLabels[pipepb.MonitoringInfo_NAMESPACE.String()],

Review comment:
       I think it's a legacy issue that hasn't migrated yet. I'm surprised, I 
thought lcwik handled that already. I'd add a comment about the discrepancy 
then. It's likely a legacy issue then. I see the existing monitoring code uses 
"PTRANSFORM" as a result too.
   
   -------
   
   Regarding the protos: Great question! Please bear with the length of this 
topic.
   
   The short answer is: that's the idiom for generated Go files: submit them.
   
   Java relies heavily on external build/package management to create jars, 
which contain the code generated from the PBs.
   Python uses a one time setup script users use to do so locally, and an 
abundance of external management (pip, pyenv etc)
   They also deliver everything via archives/wheels/jars.
   
   Go on the other hand deliberately avoids those approaches, and pointedly 
delivers everything via open source code. This works reasonably well in this 
day and age, of micro services. If you really want to deliver something a 
client runs on their machine, without delivering the source, a binary happens, 
or you simply spin up a service on the internet. No source needed.
   One of Go's better touted features is that binaries are generally statically 
compiled without needing specific DLLs or other libaries resident on the OS.
   While this can lead to larger binaries, that's certainly less of a problem 
than before with how cheap storage and RAM is these days.
   
   Further, Go has made it's way by having excellent tooling around testing and 
packages and execution. If you want to install a binary that's available as go 
code? You `go install` it and the tool takes care of the rest. However, there 
are limits to that. Go's build cycle deliberately doesn't invoke other 
processes, including additional code generators like the proto compiler.
   There is support for pre-build time code generation, using the `go generate` 
command
   Specifically, idiomatically, package users should never need to run `go 
generate` to get one of their received libraries to work. `go generate` is for 
package authors, not users. 
   As a result, anything generated by that command ends up needing to be 
committed to allow the users to access it.
   
   So to answer the question again: that approach isn't desirable from a user 
point of view, as it assumes and forces our implementation details on the user, 
instead of centralizing them on the project.
   
   ----
   
   This can then devolve into Go Package versioning, and the short answer there 
is, the Go Beam SDK doesn't currently respect versions, and won't until we 
migrate it to use Go Modules. Hopefully soon. After I finish Schemas and this 
Beam Release work...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to