lostluck commented on a change in pull request #15482:
URL: https://github.com/apache/beam/pull/15482#discussion_r717174187
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -529,6 +561,21 @@ func (r CounterResult) Result() int64 {
return r.Attempted
}
+// Name returns the Name value from the Key field.
+func (r CounterResult) Name() string {
+ return r.Key.Name
+}
+
+// Namespace returns the Namespace value from the Key field.
Review comment:
```suggestion
// Namespace returns the Namespace of this Counter.
```
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -529,6 +561,21 @@ func (r CounterResult) Result() int64 {
return r.Attempted
}
+// Name returns the Name value from the Key field.
Review comment:
```suggestion
// Name returns the Name of this Counter.
```
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -482,7 +482,39 @@ func (mr Results) AllMetrics() QueryResults {
return QueryResults{mr.counters, mr.distributions, mr.gauges}
}
-// TODO(BEAM-11217): Implement Query(Filter) and metrics filtering
+// SingleResult interface facilitates metrics query filtering methods.
+type SingleResult interface {
+ Step() string
Review comment:
Lets not implement Step filtering for now.
At present, I don't think we do a meaningful translation from the
internal/uniqueid representation of the transform id, to something the user
would be interested in. Until we do, we'd be locked into bad/weird behavior if
we change it.
Also "Step" isn't part of the user facing beam model, so it's at best
confusing.
At best, please add a comment `// TODO(BEAM-11217): Implement querying
metrics by DoFn`
We can figure this out when we also return PCollection metrics, which are
also easier to understand by querying by parent DoFn.
##########
File path: sdks/go/test/integration/wordcount/wordcount_test.go
##########
@@ -75,19 +87,26 @@ func TestWordCount(t *testing.T) {
},
2,
"Nz70m/sn3Ep9o484r7MalQ==", // whitespace doesn't
matter: same hash as above
+ "smallWords",
+ 1,
},
}
for _, test := range tests {
integration.CheckFilters(t)
- const filename = "memfs://input"
- memfs.Write(filename, []byte(strings.Join(test.lines, "\n")))
-
- p := WordCount(filename, test.hash, test.words)
- _, err := ptest.RunWithMetrics(p)
+ p, s := beam.NewPipelineWithRoot()
+ lines := beam.CreateList(s, test.lines)
+ WordCountFromPCol(s, lines, test.hash, test.words)
+ pr, err := ptest.RunWithMetrics(p)
if err != nil {
t.Errorf("WordCount(\"%v\") failed: %v",
strings.Join(test.lines, "|"), err)
}
+ qr := pr.Metrics().Query(func(sr metrics.SingleResult) bool {
+ return sr.Name() == test.name
+ })
+ if len(qr.Counters()) != test.counters {
+ t.Errorf("Metrics filtering with Name failed.\nGot %d
counters \n Want %d counters", len(qr.Counters()), test.counters)
Review comment:
As a rule, if you don't have to, don't spread test output out like this.
This case is going to change from the descriptions above but the following
reads better and says what method failed.
```
"Metrics().Query(by Name) failed. Got %d counters, want %d counters"
```
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -529,6 +561,21 @@ func (r CounterResult) Result() int64 {
return r.Attempted
}
+// Name returns the Name value from the Key field.
+func (r CounterResult) Name() string {
+ return r.Key.Name
+}
+
+// Namespace returns the Namespace value from the Key field.
+func (r CounterResult) Namespace() string {
+ return r.Key.Namespace
+}
+
+// Step returns the Step value from the Key field.
+func (r CounterResult) Step() string {
Review comment:
Per earlier comment, delete these Step methods for now.
##########
File path: sdks/go/test/integration/wordcount/wordcount_test.go
##########
@@ -30,16 +32,20 @@ import (
func TestWordCount(t *testing.T) {
tests := []struct {
- lines []string
- words int
- hash string
+ lines []string
+ words int
+ hash string
+ name string
+ counters int
Review comment:
While it's need to parameterize these in the test, there's no value in
it if they're all the same (1 counter, and we don't even check the value of it).
Instead, we have 2 different counters, the line-len distrtibution metric,
and the small words counter. Put the wanted values of those counters here
instead, rather than asserting the count (which is always 1, assuming the
pipeline executed, as we found out).
So we want to add fields so we can check the values of the counters.
```
smallWordsCount int
lineLenCount, linelenSum, lineLenMin, linLenMax int
```
And then we check those accordingly in the test loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]