kamilwu commented on a change in pull request #13272:
URL: https://github.com/apache/beam/pull/13272#discussion_r520613198
##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64)
([]byte, error) {
}
return buf.Bytes(), nil
}
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed
[]*pipepb.MonitoringInfo) (
+ []metrics.CounterResult,
+ []metrics.DistributionResult,
+ []metrics.GaugeResult) {
+ ac, ad, ag := groupByType(attempted)
+ cc, cd, cg := groupByType(committed)
+
+ c := mergeCounters(ac, cc)
+ d := mergeDistributions(ad, cd)
+ g := mergeGauges(ag, cg)
+
+ return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+ switch mi.GetType() {
+ case
+ "beam:metrics:latest_int64:v1",
+ "beam:metrics:top_n_int64:v1",
+ "beam:metrics:bottom_n_int64:v1":
+ return true
+ }
+ return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+ map[metrics.MetricKey]int64,
+ map[metrics.MetricKey]metrics.DistributionValue,
+ map[metrics.MetricKey]metrics.GaugeValue) {
+ counters := make(map[metrics.MetricKey]int64)
+ distributions := make(map[metrics.MetricKey]metrics.DistributionValue)
+ gauges := make(map[metrics.MetricKey]metrics.GaugeValue)
+
+ for _, minfo := range minfos {
+ key, err := extractKey(minfo)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ r := bytes.NewReader(minfo.GetPayload())
+
+ if IsCounter(minfo) {
Review comment:
Changed it into a switch/case statement. Do you think that using raw
strings is ok? Or should I define a new enum type for representing different
types of metrics?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]