damccorm commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r849748500
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +948,64 @@ func validateSdfElementT(fn *Fn, name string, method
*funcx.Fn, num int) error {
return nil
}
+// validateIsWatermarkEstimating returns true if watermark estimator methods
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+ var isWatermarkEstimating bool
+ if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+ isWatermarkEstimating = true
+ }
+ if !isSdf && isWatermarkEstimating {
+ return false, errors.Errorf("Watermark estimation method %v is
defined on non-splittable DoFn. Watermark"+
+ "estimation is only valid on splittable DoFns",
createWatermarkEstimatorName)
+ }
+ return isWatermarkEstimating, nil
+}
+
+// validateWatermarkSig validates that all watermark related functions are
valid
+func validateWatermarkSig(fn *Fn) error {
+ paramRange := map[string][]int{
+ createWatermarkEstimatorName: []int{0, 0},
+ }
+ returnNum := 1 // TODO(BEAM-3301): Enable optional error params in SDF
methods.
+
+ watermarkEstimatorT :=
reflect.TypeOf((*sdf.WatermarkEstimator)(nil)).Elem()
+
+ for _, name := range watermarkEstimationNames {
+ if method, ok := fn.methods[name]; ok {
+ if len(method.Param) < paramRange[name][0] ||
len(method.Param) > paramRange[name][1] {
+ err := errors.Errorf("unexpected number of
params in method %v. got: %v, want number in range: %v to %v",
+ name, len(method.Param),
paramRange[name][0], paramRange[name][1])
+ return errors.SetTopLevelMsgf(err, "Unexpected
number of parameters in method %v. "+
+ "Got: %v, Want number in range: %v to
%v. Check that the signature conforms to the expected signature for %v, "+
+ "and that elements in SDF method
parameters match elements in %v.",
+ name, len(method.Param),
paramRange[name][0], paramRange[name][1], name, processElementName)
Review Comment:
Yeah, this file does that a lot - that's a good idea
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]