[ 
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=757071&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-757071
 ]

ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Apr/22 16:37
            Start Date: 14/Apr/22 16:37
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r850609223


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +948,64 @@ func validateSdfElementT(fn *Fn, name string, method 
*funcx.Fn, num int) error {
        return nil
 }
 
+// validateIsWatermarkEstimating returns true if watermark estimator methods 
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the 
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+       var isWatermarkEstimating bool
+       if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+               isWatermarkEstimating = true
+       }

Review Comment:
   ```suggestion
        _, isWatermarkEstimating := fn.methods[createWatermarkEstimatorName]
   ```



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +948,64 @@ func validateSdfElementT(fn *Fn, name string, method 
*funcx.Fn, num int) error {
        return nil
 }
 
+// validateIsWatermarkEstimating returns true if watermark estimator methods 
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the 
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+       var isWatermarkEstimating bool
+       if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+               isWatermarkEstimating = true
+       }
+       if !isSdf && isWatermarkEstimating {
+               return false, errors.Errorf("Watermark estimation method %v is 
defined on non-splittable DoFn. Watermark"+

Review Comment:
   ```suggestion
                return false, errors.Errorf("watermark estimation method %v is 
defined on non-splittable DoFn. Watermark"+
   ```
   https://github.com/golang/go/wiki/CodeReviewComments#error-strings



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -327,6 +338,10 @@ func (n *ProcessSizedElementsAndRestrictions) 
ProcessElement(_ context.Context,
                }
        }
 
+       if n.cweInv != nil {
+               n.PDo.we = n.cweInv.Invoke()

Review Comment:
   For Daniel's benefit: In the contexts where Watermark estimation is being 
used, often only "1" element will be processed, since it's an extremely high 
fanout from that 1 element. So Bad for micro benchmarking, but probably fine in 
practice.
   
   Ultimately, this feeds into my desire to have a single Very Good example of 
how to write a Streaming SDF with all the bells and whistles, so folks don't 
end up hurting themselves with all the new features we're adding.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 757071)
    Time Spent: 3h 10m  (was: 3h)

> [Go SDK] Watermark Estimation
> -----------------------------
>
>                 Key: BEAM-11105
>                 URL: https://issues.apache.org/jira/browse/BEAM-11105
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Danny McCormick
>            Priority: P3
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to