lostluck commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r850609223
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +948,64 @@ func validateSdfElementT(fn *Fn, name string, method
*funcx.Fn, num int) error {
return nil
}
+// validateIsWatermarkEstimating returns true if watermark estimator methods
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+ var isWatermarkEstimating bool
+ if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+ isWatermarkEstimating = true
+ }
Review Comment:
```suggestion
_, isWatermarkEstimating := fn.methods[createWatermarkEstimatorName]
```
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +948,64 @@ func validateSdfElementT(fn *Fn, name string, method
*funcx.Fn, num int) error {
return nil
}
+// validateIsWatermarkEstimating returns true if watermark estimator methods
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+ var isWatermarkEstimating bool
+ if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+ isWatermarkEstimating = true
+ }
+ if !isSdf && isWatermarkEstimating {
+ return false, errors.Errorf("Watermark estimation method %v is
defined on non-splittable DoFn. Watermark"+
Review Comment:
```suggestion
return false, errors.Errorf("watermark estimation method %v is
defined on non-splittable DoFn. Watermark"+
```
https://github.com/golang/go/wiki/CodeReviewComments#error-strings
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -327,6 +338,10 @@ func (n *ProcessSizedElementsAndRestrictions)
ProcessElement(_ context.Context,
}
}
+ if n.cweInv != nil {
+ n.PDo.we = n.cweInv.Invoke()
Review Comment:
For Daniel's benefit: In the contexts where Watermark estimation is being
used, often only "1" element will be processed, since it's an extremely high
fanout from that 1 element. So Bad for micro benchmarking, but probably fine in
practice.
Ultimately, this feeds into my desire to have a single Very Good example of
how to write a Streaming SDF with all the bells and whistles, so folks don't
end up hurting themselves with all the new features we're adding.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]