damccorm commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r859210444
##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -321,27 +321,38 @@ func newCreateWatermarkEstimatorInvoker(fn *funcx.Fn)
(*cweInvoker, error) {
func (n *cweInvoker) initCallFn() error {
// Expects a signature of the form:
- // () sdf.WatermarkEstimator
+ // (watermarkState?) sdf.WatermarkEstimator
switch fnT := n.fn.Fn.(type) {
case reflectx.Func0x1:
- n.call = func() sdf.WatermarkEstimator {
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
return fnT.Call0x1().(sdf.WatermarkEstimator)
}
+ case reflectx.Func1x1:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ return fnT.Call1x1(rest).(sdf.WatermarkEstimator)
+ }
default:
- if len(n.fn.Param) != 0 {
+ switch len(n.fn.Param) {
+ case 0:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ return
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+ }
+ case 1:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ n.args[0] = rest
+ return
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+ }
+ default:
return errors.Errorf("CreateWatermarkEstimator fn %v
has unexpected number of parameters: %v",
n.fn.Fn.Name(), len(n.fn.Param))
}
- n.call = func() sdf.WatermarkEstimator {
- return n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
- }
}
return nil
}
-// Invoke calls CreateWatermarkEstimator given a restriction and returns an
sdf.WatermarkEstimator.
-func (n *cweInvoker) Invoke() sdf.WatermarkEstimator {
- return n.call()
+// Invoke calls CreateWatermarkEstimator given a restriction and returns an
sdf.RTracker.
Review Comment:
Oops, bad copy from descriptions above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]