[
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=757071&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-757071
]
ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Apr/22 16:37
Start Date: 14/Apr/22 16:37
Worklog Time Spent: 10m
Work Description: lostluck commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r850609223
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +948,64 @@ func validateSdfElementT(fn *Fn, name string, method
*funcx.Fn, num int) error {
return nil
}
+// validateIsWatermarkEstimating returns true if watermark estimator methods
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+ var isWatermarkEstimating bool
+ if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+ isWatermarkEstimating = true
+ }
Review Comment:
```suggestion
_, isWatermarkEstimating := fn.methods[createWatermarkEstimatorName]
```
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +948,64 @@ func validateSdfElementT(fn *Fn, name string, method
*funcx.Fn, num int) error {
return nil
}
+// validateIsWatermarkEstimating returns true if watermark estimator methods
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+ var isWatermarkEstimating bool
+ if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+ isWatermarkEstimating = true
+ }
+ if !isSdf && isWatermarkEstimating {
+ return false, errors.Errorf("Watermark estimation method %v is
defined on non-splittable DoFn. Watermark"+
Review Comment:
```suggestion
return false, errors.Errorf("watermark estimation method %v is
defined on non-splittable DoFn. Watermark"+
```
https://github.com/golang/go/wiki/CodeReviewComments#error-strings
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -327,6 +338,10 @@ func (n *ProcessSizedElementsAndRestrictions)
ProcessElement(_ context.Context,
}
}
+ if n.cweInv != nil {
+ n.PDo.we = n.cweInv.Invoke()
Review Comment:
For Daniel's benefit: In the contexts where Watermark estimation is being
used, often only "1" element will be processed, since it's an extremely high
fanout from that 1 element. So Bad for micro benchmarking, but probably fine in
practice.
Ultimately, this feeds into my desire to have a single Very Good example of
how to write a Streaming SDF with all the bells and whistles, so folks don't
end up hurting themselves with all the new features we're adding.
Issue Time Tracking
-------------------
Worklog Id: (was: 757071)
Time Spent: 3h 10m (was: 3h)
> [Go SDK] Watermark Estimation
> -----------------------------
>
> Key: BEAM-11105
> URL: https://issues.apache.org/jira/browse/BEAM-11105
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Danny McCormick
> Priority: P3
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)