lostluck commented on a change in pull request #13272:
URL: https://github.com/apache/beam/pull/13272#discussion_r520036345



##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -13,10 +13,12 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package harness
+package monitoring

Review comment:
       Please move the new package to under beam/core/runtime/monitoring 
instead of just in raw core. runtime is where code that depends on the protocol 
buffers tends to live.
   
   I'm not against a new package for this handling, but I think I'm against 
moving the code that is harness only being moved out of the harness. That code 
isn't intended to be part of any external API. There's also nothing relating 
the code beyond "they use the MonitoringInfos". That's not reason enough to put 
the code into the same package. To match the other convention around protos, 
perhaps we can call the package "metricsx" to go along with runtime/graphx 
which also handles protocol buffers.
   
   Further, WRT Testing, you only seem to be using the encoding functions. 
Please simply copy those helpers into the test file instead of trying to 
deduplicate. A little bit of copying is better than a little bit of dependency.

##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -199,11 +201,11 @@ func getShortID(l metrics.Labels, urn mUrn) string {
        return defaultShortIDCache.getShortID(l, urn)
 }
 
-func shortIdsToInfos(shortids []string) map[string]*pipepb.MonitoringInfo {
+func ShortIdsToInfos(shortids []string) map[string]*pipepb.MonitoringInfo {

Review comment:
       Every exported identifier should have a doc comment.
   
   https://golang.org/doc/effective_go.html#commentary
   
   // ShortIDsToInfos translates a slice shortids from a monitoring request to 
their fully qualified MonitoringInfos
   // based on the contents of the defaultShortIDCache.

##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64) 
([]byte, error) {
        }
        return buf.Bytes(), nil
 }
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed 
[]*pipepb.MonitoringInfo) (
+       []metrics.CounterResult,
+       []metrics.DistributionResult,
+       []metrics.GaugeResult) {

Review comment:
       Why not return a metrics.Results instead? Is the intent to force a 
breaking change if a new class of results is added?
   
   Conversely, by using a metrics.Results, no such breaking change needs to 
happen, and things can change more gracefully (such as when generics land in 
Go).

##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -199,11 +201,11 @@ func getShortID(l metrics.Labels, urn mUrn) string {
        return defaultShortIDCache.getShortID(l, urn)
 }
 
-func shortIdsToInfos(shortids []string) map[string]*pipepb.MonitoringInfo {
+func ShortIdsToInfos(shortids []string) map[string]*pipepb.MonitoringInfo {
        return defaultShortIDCache.shortIdsToInfos(shortids)
 }
 
-func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
+func Monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {

Review comment:
       Every exported identifier should have a doc comment.
   
   https://golang.org/doc/effective_go.html#commentary
   
   // Monitoring extracts and translates metrics from an execution plan into 
the associated MonitoringInfos and short id mappings.

##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -448,3 +453,73 @@ func (m *gauge) get() (int64, time.Time) {
        defer m.mu.Unlock()
        return m.v, m.t
 }
+
+// GaugeValue is the value of a Gauge metric.
+type GaugeValue struct {
+       Value     int64
+       Timestamp time.Time
+}
+
+// MetricResults queries for all metric values that match a given filter.
+type MetricResults struct {

Review comment:
       Please drop the "Metric" prefix when putting things into the metrics 
package.
   Otherwise users of this type will always be typing metrics.MetricResults.
   

##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -448,3 +453,73 @@ func (m *gauge) get() (int64, time.Time) {
        defer m.mu.Unlock()
        return m.v, m.t
 }
+
+// GaugeValue is the value of a Gauge metric.
+type GaugeValue struct {
+       Value     int64
+       Timestamp time.Time
+}
+
+// MetricResults queries for all metric values that match a given filter.
+type MetricResults struct {
+       Counters      []CounterResult
+       Distributions []DistributionResult
+       Gauges        []GaugeResult
+}
+
+// AllMetrics returns all metrics from a MetricResults instance.
+func (mr MetricResults) AllMetrics() MetricQueryResults {
+       return MetricQueryResults{mr.Counters, mr.Distributions, mr.Gauges}
+}
+
+// TODO: Implement Query(MetricsFilter) and metrics filtering
+
+// MetricQueryResults is the results of a query. Allows accessing all of the
+// metrics that matched the filter.
+type MetricQueryResults struct {

Review comment:
       Please drop the "Metric" prefix when putting things into the metrics 
package.
   Otherwise users of this type will always be typing metrics.MetricQueryResult.
   

##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -448,3 +453,73 @@ func (m *gauge) get() (int64, time.Time) {
        defer m.mu.Unlock()
        return m.v, m.t
 }
+
+// GaugeValue is the value of a Gauge metric.
+type GaugeValue struct {
+       Value     int64
+       Timestamp time.Time
+}
+
+// MetricResults queries for all metric values that match a given filter.
+type MetricResults struct {
+       Counters      []CounterResult
+       Distributions []DistributionResult
+       Gauges        []GaugeResult
+}
+
+// AllMetrics returns all metrics from a MetricResults instance.
+func (mr MetricResults) AllMetrics() MetricQueryResults {
+       return MetricQueryResults{mr.Counters, mr.Distributions, mr.Gauges}
+}
+
+// TODO: Implement Query(MetricsFilter) and metrics filtering
+
+// MetricQueryResults is the results of a query. Allows accessing all of the
+// metrics that matched the filter.
+type MetricQueryResults struct {
+       counters      []CounterResult
+       distributions []DistributionResult
+       gauges        []GaugeResult
+}
+
+// GetCounters returns an array of counter metrics.
+func (qr MetricQueryResults) GetCounters() []CounterResult {
+       return qr.counters
+}
+
+// GetDistributions returns an array of distribution metrics.
+func (qr MetricQueryResults) GetDistributions() []DistributionResult {
+       return qr.distributions
+}
+
+// GetGauges returns an array of gauges metrics.
+func (qr MetricQueryResults) GetGauges() []GaugeResult {
+       return qr.gauges
+}
+
+// CounterResult is an attempted and a commited value of a Counter metric plus
+// key.
+type CounterResult struct {
+       Attempted, Committed int64
+       Key                  MetricKey
+}
+
+// DistributionResult is an attempted and a commited value of a Distribution
+// metric plus key.
+type DistributionResult struct {
+       Attempted, Committed DistributionValue
+       Key                  MetricKey
+}
+
+// GaugeResult is an attempted and a commited value of a Gauge metric plus
+// key.
+type GaugeResult struct {
+       Attempted, Committed GaugeValue
+       Key                  MetricKey
+}
+
+// MetricKey includes the namespace and the name of the metric, as well as
+// the step that reported the metric.
+type MetricKey struct {

Review comment:
       Please drop the "Metric" prefix when putting things into the metrics 
package.
   Otherwise users of this type will always be typing metrics.MetricKey
   
   
   There are also metrics that aren't associated with DoFns (eg PCollection 
specific ones), that may warrant different key types, rather than a "global" 
key.  Maybe DoFnKey or StepKey?
   
   Consider documenting what this is used for, rather than what it is. "StepKey 
uniquely identifies a metric"
   
   

##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64) 
([]byte, error) {
        }
        return buf.Bytes(), nil
 }
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed 
[]*pipepb.MonitoringInfo) (
+       []metrics.CounterResult,
+       []metrics.DistributionResult,
+       []metrics.GaugeResult) {
+       ac, ad, ag := groupByType(attempted)
+       cc, cd, cg := groupByType(committed)
+
+       c := mergeCounters(ac, cc)
+       d := mergeDistributions(ad, cd)
+       g := mergeGauges(ag, cg)
+
+       return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+       return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+       return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+       switch mi.GetType() {
+       case
+               "beam:metrics:latest_int64:v1",
+               "beam:metrics:top_n_int64:v1",
+               "beam:metrics:bottom_n_int64:v1":
+               return true
+       }
+       return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+       map[metrics.MetricKey]int64,
+       map[metrics.MetricKey]metrics.DistributionValue,
+       map[metrics.MetricKey]metrics.GaugeValue) {
+       counters := make(map[metrics.MetricKey]int64)
+       distributions := make(map[metrics.MetricKey]metrics.DistributionValue)
+       gauges := make(map[metrics.MetricKey]metrics.GaugeValue)
+
+       for _, minfo := range minfos {
+               key, err := extractKey(minfo)
+               if err != nil {
+                       log.Println(err)
+                       continue
+               }
+
+               r := bytes.NewReader(minfo.GetPayload())
+
+               if IsCounter(minfo) {

Review comment:
       Same comment that I had before about consider changing this if-else 
ladder into a switch/case statement.

##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64) 
([]byte, error) {
        }
        return buf.Bytes(), nil
 }
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed 
[]*pipepb.MonitoringInfo) (
+       []metrics.CounterResult,
+       []metrics.DistributionResult,
+       []metrics.GaugeResult) {
+       ac, ad, ag := groupByType(attempted)
+       cc, cd, cg := groupByType(committed)
+
+       c := mergeCounters(ac, cc)
+       d := mergeDistributions(ad, cd)
+       g := mergeGauges(ag, cg)
+
+       return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+       return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+       return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+       switch mi.GetType() {
+       case
+               "beam:metrics:latest_int64:v1",
+               "beam:metrics:top_n_int64:v1",
+               "beam:metrics:bottom_n_int64:v1":
+               return true
+       }
+       return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+       map[metrics.MetricKey]int64,
+       map[metrics.MetricKey]metrics.DistributionValue,
+       map[metrics.MetricKey]metrics.GaugeValue) {
+       counters := make(map[metrics.MetricKey]int64)
+       distributions := make(map[metrics.MetricKey]metrics.DistributionValue)
+       gauges := make(map[metrics.MetricKey]metrics.GaugeValue)
+
+       for _, minfo := range minfos {
+               key, err := extractKey(minfo)
+               if err != nil {
+                       log.Println(err)
+                       continue
+               }
+
+               r := bytes.NewReader(minfo.GetPayload())
+
+               if IsCounter(minfo) {
+                       value, err := extractCounterValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       counters[key] = value
+               } else if IsDistribution(minfo) {
+                       value, err := extractDistributionValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       distributions[key] = value
+               } else if IsGauge(minfo) {
+                       value, err := extractGaugeValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       gauges[key] = value
+               } else {
+                       log.Println("unknown metric type")
+               }
+       }
+       return counters, distributions, gauges
+}
+
+func mergeCounters(
+       attempted map[metrics.MetricKey]int64,
+       committed map[metrics.MetricKey]int64) []metrics.CounterResult {
+       res := make([]metrics.CounterResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = -1
+               }
+               res = append(res, metrics.CounterResult{Attempted: 
attempted[k], Committed: v, Key: k})
+       }
+       return res
+}
+
+func mergeDistributions(
+       attempted map[metrics.MetricKey]metrics.DistributionValue,
+       committed map[metrics.MetricKey]metrics.DistributionValue) 
[]metrics.DistributionResult {
+       res := make([]metrics.DistributionResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = metrics.DistributionValue{}
+               }
+               res = append(res, metrics.DistributionResult{Attempted: 
attempted[k], Committed: v, Key: k})
+       }
+       return res
+}
+
+func mergeGauges(
+       attempted map[metrics.MetricKey]metrics.GaugeValue,
+       committed map[metrics.MetricKey]metrics.GaugeValue) 
[]metrics.GaugeResult {
+       res := make([]metrics.GaugeResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = metrics.GaugeValue{}
+               }
+               res = append(res, metrics.GaugeResult{Attempted: attempted[k], 
Committed: v, Key: k})
+       }
+       return res
+}
+
+func extractKey(mi *pipepb.MonitoringInfo) (metrics.MetricKey, error) {
+       labels := newLabels(mi.GetLabels())
+       stepName := getStepName(labels)
+       if stepName == "" {
+               return metrics.MetricKey{}, fmt.Errorf("Failed to deduce Step 
from MonitoringInfo: %v", mi)
+       }
+       return metrics.MetricKey{Step: stepName, Name: labels.Name(), 
Namespace: labels.Namespace()}, nil
+}
+
+func extractCounterValue(reader *bytes.Reader) (int64, error) {
+       value, err := coder.DecodeVarInt(reader)
+       if err != nil {
+               return -1, err
+       }
+       return value, nil
+}
+
+func extractDistributionValue(reader *bytes.Reader) 
(metrics.DistributionValue, error) {
+       values, err := decodeMany(reader, 4)
+       if err != nil {
+               return metrics.DistributionValue{}, err
+       }
+       return metrics.DistributionValue{Count: values[0], Sum: values[1], Min: 
values[2], Max: values[3]}, nil
+}
+
+func extractGaugeValue(reader *bytes.Reader) (metrics.GaugeValue, error) {
+       values, err := decodeMany(reader, 2)
+       if err != nil {
+               return metrics.GaugeValue{}, err
+       }
+       return metrics.GaugeValue{Timestamp: time.Unix(0, 
values[0]*int64(time.Millisecond)), Value: values[1]}, nil
+}
+
+func newLabels(miLabels map[string]string) *metrics.Labels {
+       labels := metrics.UserLabels(
+               miLabels["PTRANSFORM"],
+               miLabels[pipepb.MonitoringInfo_NAMESPACE.String()],

Review comment:
       Consider just using the constants"NAMESPACE" and "NAME". Or go the other 
way, and pull PTRANSFORM from the enum instead. The inconsistency is odd.

##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64) 
([]byte, error) {
        }
        return buf.Bytes(), nil
 }
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed 
[]*pipepb.MonitoringInfo) (
+       []metrics.CounterResult,
+       []metrics.DistributionResult,
+       []metrics.GaugeResult) {
+       ac, ad, ag := groupByType(attempted)
+       cc, cd, cg := groupByType(committed)
+
+       c := mergeCounters(ac, cc)
+       d := mergeDistributions(ad, cd)
+       g := mergeGauges(ag, cg)
+
+       return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+       return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+       return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+       switch mi.GetType() {
+       case
+               "beam:metrics:latest_int64:v1",
+               "beam:metrics:top_n_int64:v1",
+               "beam:metrics:bottom_n_int64:v1":
+               return true
+       }
+       return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+       map[metrics.MetricKey]int64,
+       map[metrics.MetricKey]metrics.DistributionValue,
+       map[metrics.MetricKey]metrics.GaugeValue) {
+       counters := make(map[metrics.MetricKey]int64)
+       distributions := make(map[metrics.MetricKey]metrics.DistributionValue)
+       gauges := make(map[metrics.MetricKey]metrics.GaugeValue)
+
+       for _, minfo := range minfos {
+               key, err := extractKey(minfo)
+               if err != nil {
+                       log.Println(err)
+                       continue
+               }
+
+               r := bytes.NewReader(minfo.GetPayload())
+
+               if IsCounter(minfo) {
+                       value, err := extractCounterValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       counters[key] = value
+               } else if IsDistribution(minfo) {
+                       value, err := extractDistributionValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       distributions[key] = value
+               } else if IsGauge(minfo) {
+                       value, err := extractGaugeValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       gauges[key] = value
+               } else {
+                       log.Println("unknown metric type")
+               }
+       }
+       return counters, distributions, gauges
+}
+
+func mergeCounters(
+       attempted map[metrics.MetricKey]int64,
+       committed map[metrics.MetricKey]int64) []metrics.CounterResult {
+       res := make([]metrics.CounterResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = -1
+               }
+               res = append(res, metrics.CounterResult{Attempted: 
attempted[k], Committed: v, Key: k})
+       }
+       return res
+}
+
+func mergeDistributions(
+       attempted map[metrics.MetricKey]metrics.DistributionValue,
+       committed map[metrics.MetricKey]metrics.DistributionValue) 
[]metrics.DistributionResult {
+       res := make([]metrics.DistributionResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = metrics.DistributionValue{}
+               }
+               res = append(res, metrics.DistributionResult{Attempted: 
attempted[k], Committed: v, Key: k})
+       }
+       return res
+}
+
+func mergeGauges(
+       attempted map[metrics.MetricKey]metrics.GaugeValue,
+       committed map[metrics.MetricKey]metrics.GaugeValue) 
[]metrics.GaugeResult {
+       res := make([]metrics.GaugeResult, 0)
+
+       for k := range attempted {
+               v, ok := committed[k]
+               if !ok {
+                       v = metrics.GaugeValue{}
+               }
+               res = append(res, metrics.GaugeResult{Attempted: attempted[k], 
Committed: v, Key: k})
+       }
+       return res
+}
+
+func extractKey(mi *pipepb.MonitoringInfo) (metrics.MetricKey, error) {
+       labels := newLabels(mi.GetLabels())
+       stepName := getStepName(labels)
+       if stepName == "" {
+               return metrics.MetricKey{}, fmt.Errorf("Failed to deduce Step 
from MonitoringInfo: %v", mi)
+       }
+       return metrics.MetricKey{Step: stepName, Name: labels.Name(), 
Namespace: labels.Namespace()}, nil
+}
+
+func extractCounterValue(reader *bytes.Reader) (int64, error) {
+       value, err := coder.DecodeVarInt(reader)
+       if err != nil {
+               return -1, err
+       }
+       return value, nil
+}
+
+func extractDistributionValue(reader *bytes.Reader) 
(metrics.DistributionValue, error) {
+       values, err := decodeMany(reader, 4)
+       if err != nil {
+               return metrics.DistributionValue{}, err
+       }
+       return metrics.DistributionValue{Count: values[0], Sum: values[1], Min: 
values[2], Max: values[3]}, nil
+}
+
+func extractGaugeValue(reader *bytes.Reader) (metrics.GaugeValue, error) {
+       values, err := decodeMany(reader, 2)
+       if err != nil {
+               return metrics.GaugeValue{}, err
+       }
+       return metrics.GaugeValue{Timestamp: time.Unix(0, 
values[0]*int64(time.Millisecond)), Value: values[1]}, nil
+}
+
+func newLabels(miLabels map[string]string) *metrics.Labels {
+       labels := metrics.UserLabels(
+               miLabels["PTRANSFORM"],
+               miLabels[pipepb.MonitoringInfo_NAMESPACE.String()],
+               miLabels[pipepb.MonitoringInfo_NAME.String()])
+       return &labels
+}
+
+func getStepName(labels *metrics.Labels) string {
+       return labels.Transform()
+}

Review comment:
       This function doesn't do anything other than serve as a comment. 
Consider simply calling labels.Transform() directly at the only call site.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to