lostluck commented on a change in pull request #15175:
URL: https://github.com/apache/beam/pull/15175#discussion_r670813578
##########
File path: sdks/go/pkg/beam/testing/passert/floats.go
##########
@@ -26,6 +26,56 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
+// EqualsFloat checks that two PCollections of floats are equal, with each
element
+// being within a specified threshold of its corresponding element. Both
PCollections
+// are loaded into memory, sorted, and compared element by element.
+func EqualsFloat(s beam.Scope, observed, expected beam.PCollection, threshold
float64) {
+ s = s.Scope(fmt.Sprintf("passert.EqualsFloat[%v]", threshold))
+ beam.ParDo0(s, &thresholdFn{threshold: threshold}, beam.Impulse(s),
beam.SideInput{Input: observed}, beam.SideInput{Input: expected})
+}
+
+type thresholdFn struct {
+ threshold float64
+}
+
+func (f *thresholdFn) ProcessElement(_ []byte, observed, expected
func(*beam.T) bool) error {
+ var observedValues, expectedValues []float64
+ var observedInput, expectedInput beam.T
+ for observed(&observedInput) {
+ val :=
reflect.ValueOf(observedInput.(interface{})).Convert(reflectx.Float64).Interface().(float64)
Review comment:
Since we now have 4 places where this reflective snippet is used,
consider writing a "toFloat" function to abstract it, making the calling code
easier to read.
##########
File path: sdks/go/pkg/beam/testing/passert/floats.go
##########
@@ -26,6 +26,56 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
+// EqualsFloat checks that two PCollections of floats are equal, with each
element
+// being within a specified threshold of its corresponding element. Both
PCollections
+// are loaded into memory, sorted, and compared element by element.
+func EqualsFloat(s beam.Scope, observed, expected beam.PCollection, threshold
float64) {
+ s = s.Scope(fmt.Sprintf("passert.EqualsFloat[%v]", threshold))
+ beam.ParDo0(s, &thresholdFn{threshold: threshold}, beam.Impulse(s),
beam.SideInput{Input: observed}, beam.SideInput{Input: expected})
+}
+
+type thresholdFn struct {
+ threshold float64
Review comment:
Export your Field names please.
While passert functions are intended to be run on single machines for
testing, unexported Fields are never serialized as part of the DoFn, which
makes them not function as intended on Portable runners.
Basically, all the rules the compiler enforces about Exported and unexported
names, are respected by the underlying "reflect" package as well for access.
This means that unexported fields can't be read and then subsequently written
to reflectively, effectively dropping their data entirely.
It's legal to have DoFns that only have unexported fields (though it
probably shouldn't be), and we likely should add a validation that there's at
least one exported field for types used as PCollection elements. It's my number
one comment to users with issues...
(unexported type and function names are fine though)
##########
File path: sdks/go/pkg/beam/testing/passert/floats.go
##########
@@ -26,6 +26,56 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
+// EqualsFloat checks that two PCollections of floats are equal, with each
element
+// being within a specified threshold of its corresponding element. Both
PCollections
+// are loaded into memory, sorted, and compared element by element.
+func EqualsFloat(s beam.Scope, observed, expected beam.PCollection, threshold
float64) {
Review comment:
As a second note to make this easier: I recommend putting the main
implementation code into a "TryEqualsFloats" function that returns an error,
with the "EqualsFloats" function calling that, panicking if the returned error
isn't nil.
This lets you test your validation error without complex "defer recover"
code, while letting users who accept the runtime panic use the shorter function
call, with users who want to handle the error themselves use the "Try" call.
##########
File path: sdks/go/pkg/beam/testing/passert/floats.go
##########
@@ -26,6 +26,56 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
+// EqualsFloat checks that two PCollections of floats are equal, with each
element
+// being within a specified threshold of its corresponding element. Both
PCollections
+// are loaded into memory, sorted, and compared element by element.
+func EqualsFloat(s beam.Scope, observed, expected beam.PCollection, threshold
float64) {
Review comment:
We should likely check that both the observed and expected PCollection
types are convertable to floats at pipeline construction time.
This is the tricky bit with leaning on "beam.T" in the DoFn's Process
Element function. It doesn't validate all the types. As the transform and the
code is written, the framework can only validate that the two side input types
are identical, but this will pass construction if you provide two PCollections
of strings, but will fail at pipeline runtime, which is undesirable.
So we should check it when the pipeline is constructed to fail as soon as
possible. This is a guiding principle: If you can turn a Pipeline runtime
failure into a Pipeline construction failure, Do it.
eg, [`beam.ValidateNonCompositeType`
](https://github.com/apache/beam/blob/c9f2321d70cac02aa59bd7ea4877cf27209ae4bc/sdks/go/pkg/beam/validate.go#L38)
gets the reflect.Type out of the PCollection, which can then be checked to
have a number underneath it or similar. (ala
[validateNonComplexNumber](https://github.com/apache/beam/blob/c9f2321d70cac02aa59bd7ea4877cf27209ae4bc/sdks/go/pkg/beam/transforms/stats/util.go#L54)
from the stats transforms package.
(and I just noticed that there's an extra "not" in the comment for that
*sigh* )
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]