lostluck commented on a change in pull request #13272:
URL: https://github.com/apache/beam/pull/13272#discussion_r518966479
##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
##########
@@ -79,15 +81,79 @@ func Execute(ctx context.Context, p *pipepb.Pipeline,
endpoint string, opt *JobO
jobID, err := Submit(ctx, client, prepID, token)
if err != nil {
- return "", err
+ return presult, err
}
log.Infof(ctx, "Submitted job: %v", jobID)
// (4) Wait for completion.
if async {
- return jobID, nil
+ return presult, nil
}
- return jobID, WaitForCompletion(ctx, client, jobID)
+ err = WaitForCompletion(ctx, client, jobID)
+
+ res, err := newUniversalPipelineResult(ctx, jobID, client)
+ if err != nil {
+ return presult, err
+ }
+ presult = res
+
+ return presult, err
+}
+
+type UniversalPipelineResult struct {
+ JobID string
+ metrics *UniversalMetrics
+}
+
+func newUniversalPipelineResult(ctx context.Context, jobID string, client
jobpb.JobServiceClient) (*UniversalPipelineResult, error) {
+ metrics, err := newUniversalMetrics(ctx, jobID, client)
+ if err != nil {
+ return &UniversalPipelineResult{jobID, nil}, err
+ }
+ return &UniversalPipelineResult{jobID, metrics}, err
+}
+
+func (pr UniversalPipelineResult) Metrics() metrics.MetricResults {
+ return pr.metrics
+}
+
+func newUniversalMetrics(ctx context.Context, jobID string, client
jobpb.JobServiceClient) (*UniversalMetrics, error) {
+ request := &jobpb.GetJobMetricsRequest{JobId: jobID}
+ response, err := client.GetJobMetrics(ctx, request)
+ if err != nil {
+ return nil, errors.Wrap(err, "failed to get metrics")
+ }
+ c, d, g := metrics.FromMonitoringInfos(response.GetMetrics())
+ return &UniversalMetrics{c, d, g}, err
+}
+
+type UniversalMetrics struct {
+ counters []metrics.CounterResult
+ distributions []metrics.DistributionResult
+ gauges []metrics.GaugeResult
+}
+
+func (um UniversalMetrics) Query() metrics.MetricQueryResults {
+ // TODO: Implement metrics filtering
Review comment:
For consistency and ease of finding related work, also put the same JIRA
id here.
##########
File path: sdks/go/pkg/beam/core/metrics/monitoring_infos.go
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+ "bytes"
+ "fmt"
+ "log"
+ "time"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+ jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(resultsProto *jobpb.MetricResults) (
+ []CounterResult,
+ []DistributionResult,
+ []GaugeResult) {
+ ac, ad, ag := groupByType(resultsProto.Attempted)
+ cc, cd, cg := groupByType(resultsProto.Committed)
+
+ c := mergeCounters(ac, cc)
+ d := mergeDistributions(ad, cd)
+ g := mergeGauges(ag, cg)
+
+ return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+ switch mi.GetType() {
+ case
+ "beam:metrics:latest_int64:v1",
+ "beam:metrics:top_n_int64:v1",
+ "beam:metrics:bottom_n_int64:v1":
+ return true
+ }
+ return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+ map[MetricKey]int64,
+ map[MetricKey]DistributionValue,
+ map[MetricKey]GaugeValue) {
+ counters := make(map[MetricKey]int64)
+ distributions := make(map[MetricKey]DistributionValue)
+ gauges := make(map[MetricKey]GaugeValue)
+
+ for _, minfo := range minfos {
+ key, err := extractKey(minfo)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ r := bytes.NewReader(minfo.GetPayload())
+
+ if IsCounter(minfo) {
+ value, err := extractCounterResult(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ counters[key] = value
+ } else if IsDistribution(minfo) {
+ value, err := extractDistributionValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ distributions[key] = value
+ } else if IsGauge(minfo) {
+ value, err := extractGaugeValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ gauges[key] = value
+ } else {
+ log.Println("unknown metric type")
+ }
+ }
+ return counters, distributions, gauges
+}
+
+func mergeCounters(attempted map[MetricKey]int64, committed
map[MetricKey]int64) []CounterResult {
+ res := make([]CounterResult, 0)
+
+ for k := range attempted {
+ if v, ok := committed[k]; ok {
+ res = append(res, CounterResult{Attempted:
attempted[k], Committed: v, Key: k})
+ } else {
+ res = append(res, CounterResult{Attempted:
attempted[k], Committed: -1, Key: k})
+ }
Review comment:
Style nit: Makes it clearer that the only difference between the two
clauses is the value of v.
```suggestion
v, ok := committed[k]
if !ok {
v = -1
}
res = append(res, CounterResult{Attempted: attempted[k],
Committed: v, Key: k})
```
If you make this change, please make the change for distributions and gauges
below for consistency,
##########
File path: sdks/go/pkg/beam/testing/ptest/ptest.go
##########
@@ -73,7 +73,8 @@ func Run(p *beam.Pipeline) error {
if *Runner == "" {
*Runner = defaultRunner
}
- return beam.Run(context.Background(), *Runner, p)
+ _, err := beam.Run(context.Background(), *Runner, p)
+ return err
Review comment:
We should consider changing ptest or at least having a new
ptest.RunWithMetrics endpoint that also returns the pipeline results, as
metrics are very valuable and convenient to check in testing situations. That
can be handled as a TODO (BEAM--####) however, rather than as a part of this
PR, since ptest is pervasive.
##########
File path: sdks/go/pkg/beam/x/beamx/run.go
##########
@@ -40,5 +40,13 @@ var runner = flag.String("runner", "direct", "Pipeline
runner.")
// defaults to the direct runner, but all beam-distributed runners and textio
// filesystems are implicitly registered.
func Run(ctx context.Context, p *beam.Pipeline) error {
+ _, err := beam.Run(ctx, *runner, p)
+ return err
+}
+
+// RunPipelineWithMetrics invokes beam.Run with the runner supplied by the
+// flag "runner". Returns a beam.PipelineResult objects, which can be
+// accessed to query the pipeline's metrics.
+func RunPipelineWithMetrics(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error) {
Review comment:
I'd omit the "Pipeline" as it's already clear from the parameter type,
and the other Run function that pipelines are going to be executed.
##########
File path: sdks/go/pkg/beam/core/metrics/monitoring_infos.go
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+ "bytes"
+ "fmt"
+ "log"
+ "time"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+ jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+// FromMonitoringInfos extracts metrics from GetJobMetrics's response and
+// groups them into counters, distributions and gauges.
+func FromMonitoringInfos(resultsProto *jobpb.MetricResults) (
+ []CounterResult,
+ []DistributionResult,
+ []GaugeResult) {
+ ac, ad, ag := groupByType(resultsProto.Attempted)
+ cc, cd, cg := groupByType(resultsProto.Committed)
+
+ c := mergeCounters(ac, cc)
+ d := mergeDistributions(ad, cd)
+ g := mergeGauges(ag, cg)
+
+ return c, d, g
+}
+
+// IsCounter returns true if the monitoring info is a counter metric.
+func IsCounter(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:sum_int64:v1"
+}
+
+// IsDistribution returns true if the monitoring info is a distribution metric.
+func IsDistribution(mi *pipepb.MonitoringInfo) bool {
+ return mi.GetType() == "beam:metrics:distribution_int64:v1"
+}
+
+// IsGauge returns true if the monitoring info is a gauge metric.
+func IsGauge(mi *pipepb.MonitoringInfo) bool {
+ switch mi.GetType() {
+ case
+ "beam:metrics:latest_int64:v1",
+ "beam:metrics:top_n_int64:v1",
+ "beam:metrics:bottom_n_int64:v1":
+ return true
+ }
+ return false
+}
+
+func groupByType(minfos []*pipepb.MonitoringInfo) (
+ map[MetricKey]int64,
+ map[MetricKey]DistributionValue,
+ map[MetricKey]GaugeValue) {
+ counters := make(map[MetricKey]int64)
+ distributions := make(map[MetricKey]DistributionValue)
+ gauges := make(map[MetricKey]GaugeValue)
+
+ for _, minfo := range minfos {
+ key, err := extractKey(minfo)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ r := bytes.NewReader(minfo.GetPayload())
+
+ if IsCounter(minfo) {
Review comment:
Style nit: Consider changing this to a switch case statement.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]