[ 
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=762617&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762617
 ]

ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Apr/22 22:59
            Start Date: 26/Apr/22 22:59
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r859210444


##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -321,27 +321,38 @@ func newCreateWatermarkEstimatorInvoker(fn *funcx.Fn) 
(*cweInvoker, error) {
 
 func (n *cweInvoker) initCallFn() error {
        // Expects a signature of the form:
-       // () sdf.WatermarkEstimator
+       // (watermarkState?) sdf.WatermarkEstimator
        switch fnT := n.fn.Fn.(type) {
        case reflectx.Func0x1:
-               n.call = func() sdf.WatermarkEstimator {
+               n.call = func(rest interface{}) sdf.WatermarkEstimator {
                        return fnT.Call0x1().(sdf.WatermarkEstimator)
                }
+       case reflectx.Func1x1:
+               n.call = func(rest interface{}) sdf.WatermarkEstimator {
+                       return fnT.Call1x1(rest).(sdf.WatermarkEstimator)
+               }
        default:
-               if len(n.fn.Param) != 0 {
+               switch len(n.fn.Param) {
+               case 0:
+                       n.call = func(rest interface{}) sdf.WatermarkEstimator {
+                               return 
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+                       }
+               case 1:
+                       n.call = func(rest interface{}) sdf.WatermarkEstimator {
+                               n.args[0] = rest
+                               return 
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+                       }
+               default:
                        return errors.Errorf("CreateWatermarkEstimator fn %v 
has unexpected number of parameters: %v",
                                n.fn.Fn.Name(), len(n.fn.Param))
                }
-               n.call = func() sdf.WatermarkEstimator {
-                       return n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
-               }
        }
        return nil
 }
 
-// Invoke calls CreateWatermarkEstimator given a restriction and returns an 
sdf.WatermarkEstimator.
-func (n *cweInvoker) Invoke() sdf.WatermarkEstimator {
-       return n.call()
+// Invoke calls CreateWatermarkEstimator given a restriction and returns an 
sdf.RTracker.

Review Comment:
   Oops, bad copy from descriptions above





Issue Time Tracking
-------------------

    Worklog Id:     (was: 762617)
    Time Spent: 5h 50m  (was: 5h 40m)

> [Go SDK] Watermark Estimation
> -----------------------------
>
>                 Key: BEAM-11105
>                 URL: https://issues.apache.org/jira/browse/BEAM-11105
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Danny McCormick
>            Priority: P3
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to