lostluck commented on a change in pull request #13272:
URL: https://github.com/apache/beam/pull/13272#discussion_r520036345
##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -13,10 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package harness
+package monitoring
Review comment:
Please move the new package to under beam/core/runtime/monitoring
instead of just in raw core. runtime is where code that depends on the protocol
buffers tends to live.
I'm not against a new package for this handling, but I think I'm against
moving the code that is harness only being moved out of the harness. That code
isn't intended to be part of any external API. There's also nothing relating
the code beyond "they use the MonitoringInfos". That's not reason enough to put
the code into the same package. To match the other convention around protos,
perhaps we can call the package "metricsx" to go along with runtime/graphx
which also handles protocol buffers.
Further, WRT Testing, you only seem to be using the encoding functions.
Please simply copy those helpers into the test file instead of trying to
deduplicate. A little bit of copying is better than a little bit of dependency.
##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -199,11 +201,11 @@ func getShortID(l metrics.Labels, urn mUrn) string {
return defaultShortIDCache.getShortID(l, urn)
}
-func shortIdsToInfos(shortids []string) map[string]*pipepb.MonitoringInfo {
+func ShortIdsToInfos(shortids []string) map[string]*pipepb.MonitoringInfo {
Review comment:
Every exported identifier should have a doc comment.
https://golang.org/doc/effective_go.html#commentary
// ShortIDsToInfos translates a slice shortids from a monitoring request to
their fully qualified MonitoringInfos
// based on the contents of the defaultShortIDCache.
##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64)
([]byte, error) {
}
return buf.Bytes(), nil
}
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed
[]*pipepb.MonitoringInfo) (
+ []metrics.CounterResult,
+ []metrics.DistributionResult,
+ []metrics.GaugeResult) {
Review comment:
Why not return a metrics.Results instead? Is the intent to force a
breaking change if a new class of results is added?
Conversely, by using a metrics.Results, no such breaking change needs to
happen, and things can change more gracefully (such as when generics land in
Go).
##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -199,11 +201,11 @@ func getShortID(l metrics.Labels, urn mUrn) string {
return defaultShortIDCache.getShortID(l, urn)
}
-func shortIdsToInfos(shortids []string) map[string]*pipepb.MonitoringInfo {
+func ShortIdsToInfos(shortids []string) map[string]*pipepb.MonitoringInfo {
return defaultShortIDCache.shortIdsToInfos(shortids)
}
-func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
+func Monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
Review comment:
Every exported identifier should have a doc comment.
https://golang.org/doc/effective_go.html#commentary
// Monitoring extracts and translates metrics from an execution plan into
the associated MonitoringInfos and short id mappings.
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -448,3 +453,73 @@ func (m *gauge) get() (int64, time.Time) {
defer m.mu.Unlock()
return m.v, m.t
}
+
+// GaugeValue is the value of a Gauge metric.
+type GaugeValue struct {
+ Value int64
+ Timestamp time.Time
+}
+
+// MetricResults queries for all metric values that match a given filter.
+type MetricResults struct {
Review comment:
Please drop the "Metric" prefix when putting things into the metrics
package.
Otherwise users of this type will always be typing metrics.MetricResults.
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -448,3 +453,73 @@ func (m *gauge) get() (int64, time.Time) {
defer m.mu.Unlock()
return m.v, m.t
}
+
+// GaugeValue is the value of a Gauge metric.
+type GaugeValue struct {
+ Value int64
+ Timestamp time.Time
+}
+
+// MetricResults queries for all metric values that match a given filter.
+type MetricResults struct {
+ Counters []CounterResult
+ Distributions []DistributionResult
+ Gauges []GaugeResult
+}
+
+// AllMetrics returns all metrics from a MetricResults instance.
+func (mr MetricResults) AllMetrics() MetricQueryResults {
+ return MetricQueryResults{mr.Counters, mr.Distributions, mr.Gauges}
+}
+
+// TODO: Implement Query(MetricsFilter) and metrics filtering
+
+// MetricQueryResults is the results of a query. Allows accessing all of the
+// metrics that matched the filter.
+type MetricQueryResults struct {
Review comment:
Please drop the "Metric" prefix when putting things into the metrics
package.
Otherwise users of this type will always be typing metrics.MetricQueryResult.
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -448,3 +453,73 @@ func (m *gauge) get() (int64, time.Time) {
defer m.mu.Unlock()
return m.v, m.t
}
+
+// GaugeValue is the value of a Gauge metric.
+type GaugeValue struct {
+ Value int64
+ Timestamp time.Time
+}
+
+// MetricResults queries for all metric values that match a given filter.
+type MetricResults struct {
+ Counters []CounterResult
+ Distributions []DistributionResult
+ Gauges []GaugeResult
+}
+
+// AllMetrics returns all metrics from a MetricResults instance.
+func (mr MetricResults) AllMetrics() MetricQueryResults {
+ return MetricQueryResults{mr.Counters, mr.Distributions, mr.Gauges}
+}
+
+// TODO: Implement Query(MetricsFilter) and metrics filtering
+
+// MetricQueryResults is the results of a query. Allows accessing all of the
+// metrics that matched the filter.
+type MetricQueryResults struct {
+ counters []CounterResult
+ distributions []DistributionResult
+ gauges []GaugeResult
+}
+
+// GetCounters returns an array of counter metrics.
+func (qr MetricQueryResults) GetCounters() []CounterResult {
+ return qr.counters
+}
+
+// GetDistributions returns an array of distribution metrics.
+func (qr MetricQueryResults) GetDistributions() []DistributionResult {
+ return qr.distributions
+}
+
+// GetGauges returns an array of gauges metrics.
+func (qr MetricQueryResults) GetGauges() []GaugeResult {
+ return qr.gauges
+}
+
+// CounterResult is an attempted and a commited value of a Counter metric plus
+// key.
+type CounterResult struct {
+ Attempted, Committed int64
+ Key MetricKey
+}
+
+// DistributionResult is an attempted and a commited value of a Distribution
+// metric plus key.
+type DistributionResult struct {
+ Attempted, Committed DistributionValue
+ Key MetricKey
+}
+
+// GaugeResult is an attempted and a commited value of a Gauge metric plus
+// key.
+type GaugeResult struct {
+ Attempted, Committed GaugeValue
+ Key MetricKey
+}
+
+// MetricKey includes the namespace and the name of the metric, as well as
+// the step that reported the metric.
+type MetricKey struct {
Review comment:
Please drop the "Metric" prefix when putting things into the metrics
package.
Otherwise users of this type will always be typing metrics.MetricKey
There are also metrics that aren't associated with DoFns (eg PCollection
specific ones), that may warrant different key types, rather than a "global"
key. Maybe DoFnKey or StepKey?
Consider documenting what this is used for, rather than what it is. "StepKey
uniquely identifies a metric"
##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64)
([]byte, error) {
}
return buf.Bytes(), nil
}
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed
[]*pipepb.MonitoringInfo) (
+ []metrics.CounterResult,
+ []metrics.DistributionResult,
+ []metrics.GaugeResult) {
+ ac, ad, ag := groupByType(attempted)
+ cc, cd, cg := groupByType(committed)
+
+ c := mergeCounters(ac, cc)
+ d := mergeDistributions(ad, cd)
+ g := mergeGauges(ag, cg)
+
+ return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+ switch mi.GetType() {
+ case
+ "beam:metrics:latest_int64:v1",
+ "beam:metrics:top_n_int64:v1",
+ "beam:metrics:bottom_n_int64:v1":
+ return true
+ }
+ return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+ map[metrics.MetricKey]int64,
+ map[metrics.MetricKey]metrics.DistributionValue,
+ map[metrics.MetricKey]metrics.GaugeValue) {
+ counters := make(map[metrics.MetricKey]int64)
+ distributions := make(map[metrics.MetricKey]metrics.DistributionValue)
+ gauges := make(map[metrics.MetricKey]metrics.GaugeValue)
+
+ for _, minfo := range minfos {
+ key, err := extractKey(minfo)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ r := bytes.NewReader(minfo.GetPayload())
+
+ if IsCounter(minfo) {
Review comment:
Same comment that I had before about consider changing this if-else
ladder into a switch/case statement.
##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64)
([]byte, error) {
}
return buf.Bytes(), nil
}
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed
[]*pipepb.MonitoringInfo) (
+ []metrics.CounterResult,
+ []metrics.DistributionResult,
+ []metrics.GaugeResult) {
+ ac, ad, ag := groupByType(attempted)
+ cc, cd, cg := groupByType(committed)
+
+ c := mergeCounters(ac, cc)
+ d := mergeDistributions(ad, cd)
+ g := mergeGauges(ag, cg)
+
+ return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+ switch mi.GetType() {
+ case
+ "beam:metrics:latest_int64:v1",
+ "beam:metrics:top_n_int64:v1",
+ "beam:metrics:bottom_n_int64:v1":
+ return true
+ }
+ return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+ map[metrics.MetricKey]int64,
+ map[metrics.MetricKey]metrics.DistributionValue,
+ map[metrics.MetricKey]metrics.GaugeValue) {
+ counters := make(map[metrics.MetricKey]int64)
+ distributions := make(map[metrics.MetricKey]metrics.DistributionValue)
+ gauges := make(map[metrics.MetricKey]metrics.GaugeValue)
+
+ for _, minfo := range minfos {
+ key, err := extractKey(minfo)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ r := bytes.NewReader(minfo.GetPayload())
+
+ if IsCounter(minfo) {
+ value, err := extractCounterValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ counters[key] = value
+ } else if IsDistribution(minfo) {
+ value, err := extractDistributionValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ distributions[key] = value
+ } else if IsGauge(minfo) {
+ value, err := extractGaugeValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ gauges[key] = value
+ } else {
+ log.Println("unknown metric type")
+ }
+ }
+ return counters, distributions, gauges
+}
+
+func mergeCounters(
+ attempted map[metrics.MetricKey]int64,
+ committed map[metrics.MetricKey]int64) []metrics.CounterResult {
+ res := make([]metrics.CounterResult, 0)
+
+ for k := range attempted {
+ v, ok := committed[k]
+ if !ok {
+ v = -1
+ }
+ res = append(res, metrics.CounterResult{Attempted:
attempted[k], Committed: v, Key: k})
+ }
+ return res
+}
+
+func mergeDistributions(
+ attempted map[metrics.MetricKey]metrics.DistributionValue,
+ committed map[metrics.MetricKey]metrics.DistributionValue)
[]metrics.DistributionResult {
+ res := make([]metrics.DistributionResult, 0)
+
+ for k := range attempted {
+ v, ok := committed[k]
+ if !ok {
+ v = metrics.DistributionValue{}
+ }
+ res = append(res, metrics.DistributionResult{Attempted:
attempted[k], Committed: v, Key: k})
+ }
+ return res
+}
+
+func mergeGauges(
+ attempted map[metrics.MetricKey]metrics.GaugeValue,
+ committed map[metrics.MetricKey]metrics.GaugeValue)
[]metrics.GaugeResult {
+ res := make([]metrics.GaugeResult, 0)
+
+ for k := range attempted {
+ v, ok := committed[k]
+ if !ok {
+ v = metrics.GaugeValue{}
+ }
+ res = append(res, metrics.GaugeResult{Attempted: attempted[k],
Committed: v, Key: k})
+ }
+ return res
+}
+
+func extractKey(mi *pipepb.MonitoringInfo) (metrics.MetricKey, error) {
+ labels := newLabels(mi.GetLabels())
+ stepName := getStepName(labels)
+ if stepName == "" {
+ return metrics.MetricKey{}, fmt.Errorf("Failed to deduce Step
from MonitoringInfo: %v", mi)
+ }
+ return metrics.MetricKey{Step: stepName, Name: labels.Name(),
Namespace: labels.Namespace()}, nil
+}
+
+func extractCounterValue(reader *bytes.Reader) (int64, error) {
+ value, err := coder.DecodeVarInt(reader)
+ if err != nil {
+ return -1, err
+ }
+ return value, nil
+}
+
+func extractDistributionValue(reader *bytes.Reader)
(metrics.DistributionValue, error) {
+ values, err := decodeMany(reader, 4)
+ if err != nil {
+ return metrics.DistributionValue{}, err
+ }
+ return metrics.DistributionValue{Count: values[0], Sum: values[1], Min:
values[2], Max: values[3]}, nil
+}
+
+func extractGaugeValue(reader *bytes.Reader) (metrics.GaugeValue, error) {
+ values, err := decodeMany(reader, 2)
+ if err != nil {
+ return metrics.GaugeValue{}, err
+ }
+ return metrics.GaugeValue{Timestamp: time.Unix(0,
values[0]*int64(time.Millisecond)), Value: values[1]}, nil
+}
+
+func newLabels(miLabels map[string]string) *metrics.Labels {
+ labels := metrics.UserLabels(
+ miLabels["PTRANSFORM"],
+ miLabels[pipepb.MonitoringInfo_NAMESPACE.String()],
Review comment:
Consider just using the constants"NAMESPACE" and "NAME". Or go the other
way, and pull PTRANSFORM from the enum instead. The inconsistency is odd.
##########
File path: sdks/go/pkg/beam/core/monitoring/monitoring.go
##########
@@ -341,3 +343,189 @@ func int64Distribution(count, sum, min, max int64)
([]byte, error) {
}
return buf.Bytes(), nil
}
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(attempted []*pipepb.MonitoringInfo, committed
[]*pipepb.MonitoringInfo) (
+ []metrics.CounterResult,
+ []metrics.DistributionResult,
+ []metrics.GaugeResult) {
+ ac, ad, ag := groupByType(attempted)
+ cc, cd, cg := groupByType(committed)
+
+ c := mergeCounters(ac, cc)
+ d := mergeDistributions(ad, cd)
+ g := mergeGauges(ag, cg)
+
+ return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+ switch mi.GetType() {
+ case
+ "beam:metrics:latest_int64:v1",
+ "beam:metrics:top_n_int64:v1",
+ "beam:metrics:bottom_n_int64:v1":
+ return true
+ }
+ return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+ map[metrics.MetricKey]int64,
+ map[metrics.MetricKey]metrics.DistributionValue,
+ map[metrics.MetricKey]metrics.GaugeValue) {
+ counters := make(map[metrics.MetricKey]int64)
+ distributions := make(map[metrics.MetricKey]metrics.DistributionValue)
+ gauges := make(map[metrics.MetricKey]metrics.GaugeValue)
+
+ for _, minfo := range minfos {
+ key, err := extractKey(minfo)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ r := bytes.NewReader(minfo.GetPayload())
+
+ if IsCounter(minfo) {
+ value, err := extractCounterValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ counters[key] = value
+ } else if IsDistribution(minfo) {
+ value, err := extractDistributionValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ distributions[key] = value
+ } else if IsGauge(minfo) {
+ value, err := extractGaugeValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ gauges[key] = value
+ } else {
+ log.Println("unknown metric type")
+ }
+ }
+ return counters, distributions, gauges
+}
+
+func mergeCounters(
+ attempted map[metrics.MetricKey]int64,
+ committed map[metrics.MetricKey]int64) []metrics.CounterResult {
+ res := make([]metrics.CounterResult, 0)
+
+ for k := range attempted {
+ v, ok := committed[k]
+ if !ok {
+ v = -1
+ }
+ res = append(res, metrics.CounterResult{Attempted:
attempted[k], Committed: v, Key: k})
+ }
+ return res
+}
+
+func mergeDistributions(
+ attempted map[metrics.MetricKey]metrics.DistributionValue,
+ committed map[metrics.MetricKey]metrics.DistributionValue)
[]metrics.DistributionResult {
+ res := make([]metrics.DistributionResult, 0)
+
+ for k := range attempted {
+ v, ok := committed[k]
+ if !ok {
+ v = metrics.DistributionValue{}
+ }
+ res = append(res, metrics.DistributionResult{Attempted:
attempted[k], Committed: v, Key: k})
+ }
+ return res
+}
+
+func mergeGauges(
+ attempted map[metrics.MetricKey]metrics.GaugeValue,
+ committed map[metrics.MetricKey]metrics.GaugeValue)
[]metrics.GaugeResult {
+ res := make([]metrics.GaugeResult, 0)
+
+ for k := range attempted {
+ v, ok := committed[k]
+ if !ok {
+ v = metrics.GaugeValue{}
+ }
+ res = append(res, metrics.GaugeResult{Attempted: attempted[k],
Committed: v, Key: k})
+ }
+ return res
+}
+
+func extractKey(mi *pipepb.MonitoringInfo) (metrics.MetricKey, error) {
+ labels := newLabels(mi.GetLabels())
+ stepName := getStepName(labels)
+ if stepName == "" {
+ return metrics.MetricKey{}, fmt.Errorf("Failed to deduce Step
from MonitoringInfo: %v", mi)
+ }
+ return metrics.MetricKey{Step: stepName, Name: labels.Name(),
Namespace: labels.Namespace()}, nil
+}
+
+func extractCounterValue(reader *bytes.Reader) (int64, error) {
+ value, err := coder.DecodeVarInt(reader)
+ if err != nil {
+ return -1, err
+ }
+ return value, nil
+}
+
+func extractDistributionValue(reader *bytes.Reader)
(metrics.DistributionValue, error) {
+ values, err := decodeMany(reader, 4)
+ if err != nil {
+ return metrics.DistributionValue{}, err
+ }
+ return metrics.DistributionValue{Count: values[0], Sum: values[1], Min:
values[2], Max: values[3]}, nil
+}
+
+func extractGaugeValue(reader *bytes.Reader) (metrics.GaugeValue, error) {
+ values, err := decodeMany(reader, 2)
+ if err != nil {
+ return metrics.GaugeValue{}, err
+ }
+ return metrics.GaugeValue{Timestamp: time.Unix(0,
values[0]*int64(time.Millisecond)), Value: values[1]}, nil
+}
+
+func newLabels(miLabels map[string]string) *metrics.Labels {
+ labels := metrics.UserLabels(
+ miLabels["PTRANSFORM"],
+ miLabels[pipepb.MonitoringInfo_NAMESPACE.String()],
+ miLabels[pipepb.MonitoringInfo_NAME.String()])
+ return &labels
+}
+
+func getStepName(labels *metrics.Labels) string {
+ return labels.Transform()
+}
Review comment:
This function doesn't do anything other than serve as a comment.
Consider simply calling labels.Transform() directly at the only call site.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]