lostluck commented on a change in pull request #13272: URL: https://github.com/apache/beam/pull/13272#discussion_r521706785
########## File path: sdks/go/pkg/beam/core/metrics/monitoring_infos.go ########## @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "bytes" + "fmt" + "log" + "time" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" Review comment: Consider moving the contents of this file into the new metricx package, which is where I thought it was going to move last time. Any specific reason this code can't go there instead of this metrics package? Part of the separation we have is to avoid having the protos dependended on by the same user construction packages, which is why there are x and non-x versions of packages for the most part. Eg. Graph which is a pure go representation of handling the pipeline graph, and graphx which translates that graph into the proto representation. Similarly, the coder and coderx packages which are the representation of the coders, and the actual execution of the coders. This notionally allows much of the Go SDK front end to be re-used for a non-beam backend without depending on the beam proto structure, and keeps the protos away from pipeline authors, for whome the protos should be implementation details. Specifically, we try to hide the proto details from pipeline authors, not runner authors. Runner authors have to deal with the protos by definition, but pipeline authors should need a limited subset of those, and certainly not the protos. We try to have most user facing things be part of the beam package, but that's not appropriate for everything (it makes the package too big, too central, and harder to maintain). ########## File path: sdks/go/pkg/beam/core/metrics/monitoring_infos_test.go ########## @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "testing" + "time" + + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/metricsx" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" +) + +func TestCounterExtraction(t *testing.T) { Review comment: Similarly, these tests should be in the metricsx package. ########## File path: sdks/go/pkg/beam/runners/universal/runnerlib/execute.go ########## @@ -79,15 +81,50 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO jobID, err := Submit(ctx, client, prepID, token) if err != nil { - return "", err + return presult, err } log.Infof(ctx, "Submitted job: %v", jobID) // (4) Wait for completion. if async { - return jobID, nil + return presult, nil } - return jobID, WaitForCompletion(ctx, client, jobID) + err = WaitForCompletion(ctx, client, jobID) + + res, err := newUniversalPipelineResult(ctx, jobID, client) + if err != nil { + return presult, err + } + presult = res + + return presult, err +} + +type universalPipelineResult struct { + JobID string + metrics *metrics.Results +} + +func newUniversalPipelineResult(ctx context.Context, jobID string, client jobpb.JobServiceClient) (*universalPipelineResult, error) { + metrics, err := getMetrics(ctx, jobID, client) + if err != nil { + return &universalPipelineResult{jobID, nil}, err + } + return &universalPipelineResult{jobID, metrics}, err +} + +func (pr universalPipelineResult) Metrics() metrics.Results { + return *pr.metrics +} + +func getMetrics(ctx context.Context, jobID string, client jobpb.JobServiceClient) (*metrics.Results, error) { Review comment: Is it necessary to isolate this part in it's own function? It's only used in the "newUniversalPipelineResult" which takes all the same arguments. If this is tested separately sure, but I don't see a reason to do that separately from the newUniversalPipelineResult for the previous reason. ########## File path: sdks/go/pkg/beam/core/metrics/metrics.go ########## @@ -448,3 +453,73 @@ func (m *gauge) get() (int64, time.Time) { defer m.mu.Unlock() return m.v, m.t } + +// GaugeValue is the value of a Gauge metric. +type GaugeValue struct { + Value int64 + Timestamp time.Time +} + +// Results represents all metrics gathered during the job's execution. +// It allows for querying metrics using a provided filter. +type Results struct { + counters []CounterResult + distributions []DistributionResult + gauges []GaugeResult +} + +// AllMetrics returns all metrics from a Results instance. +func (mr Results) AllMetrics() QueryResults { + return QueryResults{mr.counters, mr.distributions, mr.gauges} +} + +// TODO(BEAM-11217): Implement Query(Filter) and metrics filtering + +// QueryResults is the result of a query. Allows accessing all of the +// metrics that matched the filter. +type QueryResults struct { + counters []CounterResult + distributions []DistributionResult + gauges []GaugeResult +} + +// GetCounters returns an array of counter metrics. +func (qr QueryResults) GetCounters() []CounterResult { Review comment: That would still be idiomatic go. The question is whether we need to "hide" this behind an interface or not. I'm leaning no to that. The real "trick" is probably copying out the results so users can query/mutate their own copies freely, without worrying about aliasing issues. Slices are not immutable, and are a pointer to a backing array which can be shared by multiple headers. The current method sets have just as many aliasing issues as having raw fields, so the fields aren't helping much. Idiomatically BTW if we were keeping these methods, we wouldn't have the Get* prefix which is very much a javaism. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
