lostluck commented on a change in pull request #15175:
URL: https://github.com/apache/beam/pull/15175#discussion_r670813578



##########
File path: sdks/go/pkg/beam/testing/passert/floats.go
##########
@@ -26,6 +26,56 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+// EqualsFloat checks that two PCollections of floats are equal, with each 
element
+// being within a specified threshold of its corresponding element. Both 
PCollections
+// are loaded into memory, sorted, and compared element by element.
+func EqualsFloat(s beam.Scope, observed, expected beam.PCollection, threshold 
float64) {
+       s = s.Scope(fmt.Sprintf("passert.EqualsFloat[%v]", threshold))
+       beam.ParDo0(s, &thresholdFn{threshold: threshold}, beam.Impulse(s), 
beam.SideInput{Input: observed}, beam.SideInput{Input: expected})
+}
+
+type thresholdFn struct {
+       threshold float64
+}
+
+func (f *thresholdFn) ProcessElement(_ []byte, observed, expected 
func(*beam.T) bool) error {
+       var observedValues, expectedValues []float64
+       var observedInput, expectedInput beam.T
+       for observed(&observedInput) {
+               val := 
reflect.ValueOf(observedInput.(interface{})).Convert(reflectx.Float64).Interface().(float64)

Review comment:
       Since we now have 4 places where this reflective snippet is used, 
consider writing a "toFloat" function to abstract it, making the calling code 
easier to read.

##########
File path: sdks/go/pkg/beam/testing/passert/floats.go
##########
@@ -26,6 +26,56 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+// EqualsFloat checks that two PCollections of floats are equal, with each 
element
+// being within a specified threshold of its corresponding element. Both 
PCollections
+// are loaded into memory, sorted, and compared element by element.
+func EqualsFloat(s beam.Scope, observed, expected beam.PCollection, threshold 
float64) {
+       s = s.Scope(fmt.Sprintf("passert.EqualsFloat[%v]", threshold))
+       beam.ParDo0(s, &thresholdFn{threshold: threshold}, beam.Impulse(s), 
beam.SideInput{Input: observed}, beam.SideInput{Input: expected})
+}
+
+type thresholdFn struct {
+       threshold float64

Review comment:
       Export your Field names please.
   
   While passert functions are intended to be run on single machines for 
testing, unexported Fields are never serialized as part of the DoFn, which 
makes them not function as intended on Portable runners.
   
   Basically, all the rules the compiler enforces about Exported and unexported 
names, are respected by the underlying "reflect" package as well for access. 
This means that unexported fields can't be read and then subsequently written 
to reflectively, effectively dropping their data entirely.
   
   It's legal to have DoFns that only have unexported fields (though it 
probably shouldn't be), and we likely should add a validation that there's at 
least one exported field for types used as PCollection elements. It's my number 
one comment to users with issues...
   
   (unexported type and function names are fine though)

##########
File path: sdks/go/pkg/beam/testing/passert/floats.go
##########
@@ -26,6 +26,56 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+// EqualsFloat checks that two PCollections of floats are equal, with each 
element
+// being within a specified threshold of its corresponding element. Both 
PCollections
+// are loaded into memory, sorted, and compared element by element.
+func EqualsFloat(s beam.Scope, observed, expected beam.PCollection, threshold 
float64) {

Review comment:
       As a second note to make this easier: I recommend putting the main 
implementation code into a "TryEqualsFloats" function that returns an error, 
with the "EqualsFloats" function calling that, panicking if the returned error 
isn't nil.
   
   This lets you test your validation error without complex "defer recover" 
code, while letting users who accept the runtime panic use the shorter function 
call, with users who want to handle the error themselves use the "Try" call.

##########
File path: sdks/go/pkg/beam/testing/passert/floats.go
##########
@@ -26,6 +26,56 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+// EqualsFloat checks that two PCollections of floats are equal, with each 
element
+// being within a specified threshold of its corresponding element. Both 
PCollections
+// are loaded into memory, sorted, and compared element by element.
+func EqualsFloat(s beam.Scope, observed, expected beam.PCollection, threshold 
float64) {

Review comment:
       We should likely check that both the observed and expected PCollection 
types are convertable to floats at pipeline construction time.
   
   This is the tricky bit with leaning on "beam.T" in the DoFn's Process 
Element function. It doesn't validate all the types. As the transform and the 
code is written, the framework can only validate that the two side input types 
are identical, but this will pass construction if you provide two PCollections 
of strings, but will fail at pipeline runtime, which is undesirable.
   
   So we should check it when the pipeline is constructed to fail as soon as 
possible. This is a guiding principle: If you can turn a Pipeline runtime 
failure into a Pipeline construction failure, Do it.
   
   eg,  [`beam.ValidateNonCompositeType` 
](https://github.com/apache/beam/blob/c9f2321d70cac02aa59bd7ea4877cf27209ae4bc/sdks/go/pkg/beam/validate.go#L38)
 gets the reflect.Type out of the PCollection, which can then be checked to 
have a number underneath it or similar.   (ala 
[validateNonComplexNumber](https://github.com/apache/beam/blob/c9f2321d70cac02aa59bd7ea4877cf27209ae4bc/sdks/go/pkg/beam/transforms/stats/util.go#L54)
 from the stats transforms package.
   (and I just noticed that there's an extra "not" in the comment for that 
*sigh* )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to