damccorm commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r849748500


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +948,64 @@ func validateSdfElementT(fn *Fn, name string, method 
*funcx.Fn, num int) error {
        return nil
 }
 
+// validateIsWatermarkEstimating returns true if watermark estimator methods 
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the 
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+       var isWatermarkEstimating bool
+       if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+               isWatermarkEstimating = true
+       }
+       if !isSdf && isWatermarkEstimating {
+               return false, errors.Errorf("Watermark estimation method %v is 
defined on non-splittable DoFn. Watermark"+
+                       "estimation is only valid on splittable DoFns", 
createWatermarkEstimatorName)
+       }
+       return isWatermarkEstimating, nil
+}
+
+// validateWatermarkSig validates that all watermark related functions are 
valid
+func validateWatermarkSig(fn *Fn) error {
+       paramRange := map[string][]int{
+               createWatermarkEstimatorName: []int{0, 0},
+       }
+       returnNum := 1 // TODO(BEAM-3301): Enable optional error params in SDF 
methods.
+
+       watermarkEstimatorT := 
reflect.TypeOf((*sdf.WatermarkEstimator)(nil)).Elem()
+
+       for _, name := range watermarkEstimationNames {
+               if method, ok := fn.methods[name]; ok {
+                       if len(method.Param) < paramRange[name][0] || 
len(method.Param) > paramRange[name][1] {
+                               err := errors.Errorf("unexpected number of 
params in method %v. got: %v, want number in range: %v to %v",
+                                       name, len(method.Param), 
paramRange[name][0], paramRange[name][1])
+                               return errors.SetTopLevelMsgf(err, "Unexpected 
number of parameters in method %v. "+
+                                       "Got: %v, Want number in range: %v to 
%v. Check that the signature conforms to the expected signature for %v, "+
+                                       "and that elements in SDF method 
parameters match elements in %v.",
+                                       name, len(method.Param), 
paramRange[name][0], paramRange[name][1], name, processElementName)

Review Comment:
   Yeah, this file does that a lot - that's a good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to