[
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=762601&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762601
]
ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Apr/22 22:25
Start Date: 26/Apr/22 22:25
Worklog Time Spent: 10m
Work Description: lostluck commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r859184689
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -168,7 +168,9 @@ const (
restrictionSizeName = "RestrictionSize"
createTrackerName = "CreateTracker"
- createWatermarkEstimatorName = "CreateWatermarkEstimator"
+ createWatermarkEstimatorName = "CreateWatermarkEstimator"
+ getInitialWatermarkEstimatorStateName =
"GetInitialWatermarkEstimatorState"
+ getWatermarkEstimatorStateName = "GetWatermarkEstimatorState"
Review Comment:
Drop the "Get" prefixes. Idiomatic Go doesn't prefix field fetching methods
with "Get" unnecessarily.
##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -321,27 +321,38 @@ func newCreateWatermarkEstimatorInvoker(fn *funcx.Fn)
(*cweInvoker, error) {
func (n *cweInvoker) initCallFn() error {
// Expects a signature of the form:
- // () sdf.WatermarkEstimator
+ // (watermarkState?) sdf.WatermarkEstimator
switch fnT := n.fn.Fn.(type) {
case reflectx.Func0x1:
- n.call = func() sdf.WatermarkEstimator {
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
return fnT.Call0x1().(sdf.WatermarkEstimator)
}
+ case reflectx.Func1x1:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ return fnT.Call1x1(rest).(sdf.WatermarkEstimator)
+ }
default:
- if len(n.fn.Param) != 0 {
+ switch len(n.fn.Param) {
+ case 0:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ return
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+ }
+ case 1:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ n.args[0] = rest
+ return
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+ }
+ default:
return errors.Errorf("CreateWatermarkEstimator fn %v
has unexpected number of parameters: %v",
n.fn.Fn.Name(), len(n.fn.Param))
}
- n.call = func() sdf.WatermarkEstimator {
- return n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
- }
}
return nil
}
-// Invoke calls CreateWatermarkEstimator given a restriction and returns an
sdf.WatermarkEstimator.
-func (n *cweInvoker) Invoke() sdf.WatermarkEstimator {
- return n.call()
+// Invoke calls CreateWatermarkEstimator given a restriction and returns an
sdf.RTracker.
Review Comment:
Either the comment is wrong, or the return type was wrong.
```suggestion
// Invoke calls CreateWatermarkEstimator given a restriction and returns an
sdf.WatermarkEstimator.
```
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -324,6 +330,27 @@ func (f *SplittableDoFn) WatermarkEstimatorT()
reflect.Type {
return f.CreateWatermarkEstimatorFn().Ret[0].T
}
+// IsWatermarkEstimating returns whether the DoFn implements custom watermark
state.
Review Comment:
Name typo.
```suggestion
// IsStatefulWatermarkEstimating returns whether the DoFn implements custom
watermark state.
```
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -492,7 +537,7 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f
float64) ([]*FullValue, []
// behavior is identical). A single restriction split will occur and all
windows
// present in the unsplit element will be present in both the resulting primary
// and residual.
-func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64)
([]*FullValue, []*FullValue, error) {
+func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64,
pWeState interface{}, rWeState interface{}) ([]*FullValue, []*FullValue, error)
{
Review Comment:
Nit here and similar methods below: You can avoid repeating the type for
each input parameter of an identical type. eg `a int, b int, c int, d int` =>
`a, b, c, d int`
This also works for identical fields in a struct declaration.
```suggestion
func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64,
pWeState, rWeState interface{}) ([]*FullValue, []*FullValue, error) {
```
##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection,
opts ...Option) ([]PCo
}
var rc *coder.Coder
+ // Sdfs will always encode restrictions as KV<restriction, watermark
state | nil>
Review Comment:
```suggestion
// Sdfs will always encode restrictions as KV<restriction, watermark
state | bool(false)>
```
Issue Time Tracking
-------------------
Worklog Id: (was: 762601)
Time Spent: 5h 40m (was: 5.5h)
> [Go SDK] Watermark Estimation
> -----------------------------
>
> Key: BEAM-11105
> URL: https://issues.apache.org/jira/browse/BEAM-11105
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Danny McCormick
> Priority: P3
> Time Spent: 5h 40m
> Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)