[ 
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=755701&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755701
 ]

ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Apr/22 11:34
            Start Date: 12/Apr/22 11:34
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r848330724


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -301,6 +308,24 @@ func (f *SplittableDoFn) RestrictionT() reflect.Type {
        return f.CreateInitialRestrictionFn().Ret[0].T
 }
 
+// IsWatermarkEstimating returns whether the DoFn implements a custom 
watermark estimator.
+func (f *SplittableDoFn) IsWatermarkEstimating() bool {
+       // Validation already passed, so if one SDF method is present they 
should
+       // all be present.

Review Comment:
   Whoops, good catch



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +950,64 @@ func validateSdfElementT(fn *Fn, name string, method 
*funcx.Fn, num int) error {
        return nil
 }
 
+// validateIsWatermarkEstimating returns true if watermark estimator methods 
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the 
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+       var isWatermarkEstimating bool
+       if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+               isWatermarkEstimating = true
+       }
+       if !isSdf && isWatermarkEstimating {
+               return false, errors.Errorf("Watermark estimation method %v is 
defined on non-splittable DoFn. Watermark"+
+                       "estimation is only valid on splittable DoFns", 
createWatermarkEstimatorName)
+       }
+       return isWatermarkEstimating, nil
+}
+
+// validateWatermarkSig validates that all watermark related functions are 
valid
+func validateWatermarkSig(fn *Fn) error {
+       paramRange := map[string][]int{

Review Comment:
   Yeah, there will definitely be more added in the future (I actually already 
have those changes locally 😄 )





Issue Time Tracking
-------------------

    Worklog Id:     (was: 755701)
    Time Spent: 1.5h  (was: 1h 20m)

> [Go SDK] Watermark Estimation
> -----------------------------
>
>                 Key: BEAM-11105
>                 URL: https://issues.apache.org/jira/browse/BEAM-11105
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Danny McCormick
>            Priority: P3
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to