[
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=762617&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762617
]
ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Apr/22 22:59
Start Date: 26/Apr/22 22:59
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r859210444
##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -321,27 +321,38 @@ func newCreateWatermarkEstimatorInvoker(fn *funcx.Fn)
(*cweInvoker, error) {
func (n *cweInvoker) initCallFn() error {
// Expects a signature of the form:
- // () sdf.WatermarkEstimator
+ // (watermarkState?) sdf.WatermarkEstimator
switch fnT := n.fn.Fn.(type) {
case reflectx.Func0x1:
- n.call = func() sdf.WatermarkEstimator {
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
return fnT.Call0x1().(sdf.WatermarkEstimator)
}
+ case reflectx.Func1x1:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ return fnT.Call1x1(rest).(sdf.WatermarkEstimator)
+ }
default:
- if len(n.fn.Param) != 0 {
+ switch len(n.fn.Param) {
+ case 0:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ return
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+ }
+ case 1:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ n.args[0] = rest
+ return
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+ }
+ default:
return errors.Errorf("CreateWatermarkEstimator fn %v
has unexpected number of parameters: %v",
n.fn.Fn.Name(), len(n.fn.Param))
}
- n.call = func() sdf.WatermarkEstimator {
- return n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
- }
}
return nil
}
-// Invoke calls CreateWatermarkEstimator given a restriction and returns an
sdf.WatermarkEstimator.
-func (n *cweInvoker) Invoke() sdf.WatermarkEstimator {
- return n.call()
+// Invoke calls CreateWatermarkEstimator given a restriction and returns an
sdf.RTracker.
Review Comment:
Oops, bad copy from descriptions above
Issue Time Tracking
-------------------
Worklog Id: (was: 762617)
Time Spent: 5h 50m (was: 5h 40m)
> [Go SDK] Watermark Estimation
> -----------------------------
>
> Key: BEAM-11105
> URL: https://issues.apache.org/jira/browse/BEAM-11105
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Danny McCormick
> Priority: P3
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)