lostluck commented on a change in pull request #13272:
URL: https://github.com/apache/beam/pull/13272#discussion_r521706785



##########
File path: sdks/go/pkg/beam/core/metrics/monitoring_infos.go
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+       "bytes"
+       "fmt"
+       "log"
+       "time"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"

Review comment:
       Consider moving the contents of this file into the new metricx package, 
which is where I thought it was going to move last time. Any specific reason 
this code can't go there instead of this metrics package?
   
   Part of the separation we have is to avoid having the protos dependended on 
by the same user construction packages, which is why there are x and non-x 
versions of packages for the most part. 
   
   Eg. Graph which is a pure go representation of handling the pipeline graph, 
and graphx which translates that graph into the proto representation. 
Similarly, the coder and coderx packages which are the representation of the 
coders, and the actual execution of the coders.
   
   This notionally allows much of the Go SDK front end to be re-used for a 
non-beam backend without depending on the beam proto structure, and keeps the 
protos away from pipeline authors, for whome the protos should be 
implementation details.
   
   Specifically, we try to hide the proto details from pipeline authors, not 
runner authors. Runner authors have to deal with the protos by definition, but 
pipeline authors  should need a limited subset of those, and certainly not the 
protos. We try to have most user facing things be part of the beam package, but 
that's not appropriate for everything (it makes the package too big, too 
central, and harder to maintain).

##########
File path: sdks/go/pkg/beam/core/metrics/monitoring_infos_test.go
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+       "testing"
+       "time"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/metricsx"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+func TestCounterExtraction(t *testing.T) {

Review comment:
       Similarly, these tests should be in the metricsx package.

##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
##########
@@ -79,15 +81,50 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, 
endpoint string, opt *JobO
 
        jobID, err := Submit(ctx, client, prepID, token)
        if err != nil {
-               return "", err
+               return presult, err
        }
 
        log.Infof(ctx, "Submitted job: %v", jobID)
 
        // (4) Wait for completion.
 
        if async {
-               return jobID, nil
+               return presult, nil
        }
-       return jobID, WaitForCompletion(ctx, client, jobID)
+       err = WaitForCompletion(ctx, client, jobID)
+
+       res, err := newUniversalPipelineResult(ctx, jobID, client)
+       if err != nil {
+               return presult, err
+       }
+       presult = res
+
+       return presult, err
+}
+
+type universalPipelineResult struct {
+       JobID   string
+       metrics *metrics.Results
+}
+
+func newUniversalPipelineResult(ctx context.Context, jobID string, client 
jobpb.JobServiceClient) (*universalPipelineResult, error) {
+       metrics, err := getMetrics(ctx, jobID, client)
+       if err != nil {
+               return &universalPipelineResult{jobID, nil}, err
+       }
+       return &universalPipelineResult{jobID, metrics}, err
+}
+
+func (pr universalPipelineResult) Metrics() metrics.Results {
+       return *pr.metrics
+}
+
+func getMetrics(ctx context.Context, jobID string, client 
jobpb.JobServiceClient) (*metrics.Results, error) {

Review comment:
       Is it necessary to isolate this part in it's own function? It's only 
used in the "newUniversalPipelineResult" which takes all the same arguments.  
If this is tested separately sure, but I don't see a reason to do that 
separately from the newUniversalPipelineResult for the previous reason.

##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -448,3 +453,73 @@ func (m *gauge) get() (int64, time.Time) {
        defer m.mu.Unlock()
        return m.v, m.t
 }
+
+// GaugeValue is the value of a Gauge metric.
+type GaugeValue struct {
+       Value     int64
+       Timestamp time.Time
+}
+
+// Results represents all metrics gathered during the job's execution.
+// It allows for querying metrics using a provided filter.
+type Results struct {
+       counters      []CounterResult
+       distributions []DistributionResult
+       gauges        []GaugeResult
+}
+
+// AllMetrics returns all metrics from a Results instance.
+func (mr Results) AllMetrics() QueryResults {
+       return QueryResults{mr.counters, mr.distributions, mr.gauges}
+}
+
+// TODO(BEAM-11217): Implement Query(Filter) and metrics filtering
+
+// QueryResults is the result of a query. Allows accessing all of the
+// metrics that matched the filter.
+type QueryResults struct {
+       counters      []CounterResult
+       distributions []DistributionResult
+       gauges        []GaugeResult
+}
+
+// GetCounters returns an array of counter metrics.
+func (qr QueryResults) GetCounters() []CounterResult {

Review comment:
       That would still be idiomatic go. The question is whether we need to 
"hide" this behind an interface or not. I'm leaning no to that.  
   
   The real "trick" is probably copying out the results so users can 
query/mutate their own copies freely, without worrying about aliasing issues. 
Slices are not immutable, and are a pointer to a backing array which can be 
shared by multiple headers. The current method sets have just as many aliasing 
issues as having raw fields, so the fields aren't helping much.
   
   Idiomatically BTW if we were keeping these methods, we wouldn't have the 
Get* prefix which is very much a javaism.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to