lostluck commented on a change in pull request #13272:
URL: https://github.com/apache/beam/pull/13272#discussion_r522344928
##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64)
([]byte, error) {
}
return buf.Bytes(), nil
}
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed
[]*pipepb.MonitoringInfo) (
+ []metrics.CounterResult,
+ []metrics.DistributionResult,
+ []metrics.GaugeResult) {
+ ac, ad, ag := groupByType(attempted)
+ cc, cd, cg := groupByType(committed)
+
+ c := mergeCounters(ac, cc)
+ d := mergeDistributions(ad, cd)
+ g := mergeGauges(ag, cg)
+
+ return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+ switch mi.GetType() {
+ case
+ "beam:metrics:latest_int64:v1",
+ "beam:metrics:top_n_int64:v1",
+ "beam:metrics:bottom_n_int64:v1":
+ return true
+ }
+ return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+ map[metrics.MetricKey]int64,
+ map[metrics.MetricKey]metrics.DistributionValue,
+ map[metrics.MetricKey]metrics.GaugeValue) {
+ counters := make(map[metrics.MetricKey]int64)
+ distributions := make(map[metrics.MetricKey]metrics.DistributionValue)
+ gauges := make(map[metrics.MetricKey]metrics.GaugeValue)
+
+ for _, minfo := range minfos {
+ key, err := extractKey(minfo)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ r := bytes.NewReader(minfo.GetPayload())
+
+ if IsCounter(minfo) {
+ value, err := extractCounterValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ counters[key] = value
+ } else if IsDistribution(minfo) {
+ value, err := extractDistributionValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ distributions[key] = value
+ } else if IsGauge(minfo) {
+ value, err := extractGaugeValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ gauges[key] = value
+ } else {
+ log.Println("unknown metric type")
+ }
+ }
+ return counters, distributions, gauges
+}
+
+func mergeCounters(
+ attempted map[metrics.MetricKey]int64,
+ committed map[metrics.MetricKey]int64) []metrics.CounterResult {
+ res := make([]metrics.CounterResult, 0)
+
+ for k := range attempted {
+ v, ok := committed[k]
+ if !ok {
+ v = -1
+ }
+ res = append(res, metrics.CounterResult{Attempted:
attempted[k], Committed: v, Key: k})
+ }
+ return res
+}
+
+func mergeDistributions(
+ attempted map[metrics.MetricKey]metrics.DistributionValue,
+ committed map[metrics.MetricKey]metrics.DistributionValue)
[]metrics.DistributionResult {
+ res := make([]metrics.DistributionResult, 0)
+
+ for k := range attempted {
+ v, ok := committed[k]
+ if !ok {
+ v = metrics.DistributionValue{}
+ }
+ res = append(res, metrics.DistributionResult{Attempted:
attempted[k], Committed: v, Key: k})
+ }
+ return res
+}
+
+func mergeGauges(
+ attempted map[metrics.MetricKey]metrics.GaugeValue,
+ committed map[metrics.MetricKey]metrics.GaugeValue)
[]metrics.GaugeResult {
+ res := make([]metrics.GaugeResult, 0)
+
+ for k := range attempted {
+ v, ok := committed[k]
+ if !ok {
+ v = metrics.GaugeValue{}
+ }
+ res = append(res, metrics.GaugeResult{Attempted: attempted[k],
Committed: v, Key: k})
+ }
+ return res
+}
+
+func extractKey(mi *pipepb.MonitoringInfo) (metrics.MetricKey, error) {
+ labels := newLabels(mi.GetLabels())
+ stepName := getStepName(labels)
+ if stepName == "" {
+ return metrics.MetricKey{}, fmt.Errorf("Failed to deduce Step
from MonitoringInfo: %v", mi)
+ }
+ return metrics.MetricKey{Step: stepName, Name: labels.Name(),
Namespace: labels.Namespace()}, nil
+}
+
+func extractCounterValue(reader *bytes.Reader) (int64, error) {
+ value, err := coder.DecodeVarInt(reader)
+ if err != nil {
+ return -1, err
+ }
+ return value, nil
+}
+
+func extractDistributionValue(reader *bytes.Reader)
(metrics.DistributionValue, error) {
+ values, err := decodeMany(reader, 4)
+ if err != nil {
+ return metrics.DistributionValue{}, err
+ }
+ return metrics.DistributionValue{Count: values[0], Sum: values[1], Min:
values[2], Max: values[3]}, nil
+}
+
+func extractGaugeValue(reader *bytes.Reader) (metrics.GaugeValue, error) {
+ values, err := decodeMany(reader, 2)
+ if err != nil {
+ return metrics.GaugeValue{}, err
+ }
+ return metrics.GaugeValue{Timestamp: time.Unix(0,
values[0]*int64(time.Millisecond)), Value: values[1]}, nil
+}
+
+func newLabels(miLabels map[string]string) *metrics.Labels {
+ labels := metrics.UserLabels(
+ miLabels["PTRANSFORM"],
+ miLabels[pipepb.MonitoringInfo_NAMESPACE.String()],
Review comment:
This will certainly become easier once we're on Go Modules. Right now
it's tricky due to ensuring we have the right versions of the proto code
generators. Heck, at a certain point, one could script installing protoc and
installing the go proto plugin so go generate for the protos is fairly self
contained for contributors.
Nothing you've been doing in this PR requires re-generating the protos, or
code generated type assertion "shims", so hopefully you haven't run into places
where it was implied that was necessary.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]