lostluck commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r859184689
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -168,7 +168,9 @@ const (
restrictionSizeName = "RestrictionSize"
createTrackerName = "CreateTracker"
- createWatermarkEstimatorName = "CreateWatermarkEstimator"
+ createWatermarkEstimatorName = "CreateWatermarkEstimator"
+ getInitialWatermarkEstimatorStateName =
"GetInitialWatermarkEstimatorState"
+ getWatermarkEstimatorStateName = "GetWatermarkEstimatorState"
Review Comment:
Drop the "Get" prefixes. Idiomatic Go doesn't prefix field fetching methods
with "Get" unnecessarily.
##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -321,27 +321,38 @@ func newCreateWatermarkEstimatorInvoker(fn *funcx.Fn)
(*cweInvoker, error) {
func (n *cweInvoker) initCallFn() error {
// Expects a signature of the form:
- // () sdf.WatermarkEstimator
+ // (watermarkState?) sdf.WatermarkEstimator
switch fnT := n.fn.Fn.(type) {
case reflectx.Func0x1:
- n.call = func() sdf.WatermarkEstimator {
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
return fnT.Call0x1().(sdf.WatermarkEstimator)
}
+ case reflectx.Func1x1:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ return fnT.Call1x1(rest).(sdf.WatermarkEstimator)
+ }
default:
- if len(n.fn.Param) != 0 {
+ switch len(n.fn.Param) {
+ case 0:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ return
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+ }
+ case 1:
+ n.call = func(rest interface{}) sdf.WatermarkEstimator {
+ n.args[0] = rest
+ return
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+ }
+ default:
return errors.Errorf("CreateWatermarkEstimator fn %v
has unexpected number of parameters: %v",
n.fn.Fn.Name(), len(n.fn.Param))
}
- n.call = func() sdf.WatermarkEstimator {
- return n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
- }
}
return nil
}
-// Invoke calls CreateWatermarkEstimator given a restriction and returns an
sdf.WatermarkEstimator.
-func (n *cweInvoker) Invoke() sdf.WatermarkEstimator {
- return n.call()
+// Invoke calls CreateWatermarkEstimator given a restriction and returns an
sdf.RTracker.
Review Comment:
Either the comment is wrong, or the return type was wrong.
```suggestion
// Invoke calls CreateWatermarkEstimator given a restriction and returns an
sdf.WatermarkEstimator.
```
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -324,6 +330,27 @@ func (f *SplittableDoFn) WatermarkEstimatorT()
reflect.Type {
return f.CreateWatermarkEstimatorFn().Ret[0].T
}
+// IsWatermarkEstimating returns whether the DoFn implements custom watermark
state.
Review Comment:
Name typo.
```suggestion
// IsStatefulWatermarkEstimating returns whether the DoFn implements custom
watermark state.
```
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -492,7 +537,7 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f
float64) ([]*FullValue, []
// behavior is identical). A single restriction split will occur and all
windows
// present in the unsplit element will be present in both the resulting primary
// and residual.
-func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64)
([]*FullValue, []*FullValue, error) {
+func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64,
pWeState interface{}, rWeState interface{}) ([]*FullValue, []*FullValue, error)
{
Review Comment:
Nit here and similar methods below: You can avoid repeating the type for
each input parameter of an identical type. eg `a int, b int, c int, d int` =>
`a, b, c, d int`
This also works for identical fields in a struct declaration.
```suggestion
func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64,
pWeState, rWeState interface{}) ([]*FullValue, []*FullValue, error) {
```
##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection,
opts ...Option) ([]PCo
}
var rc *coder.Coder
+ // Sdfs will always encode restrictions as KV<restriction, watermark
state | nil>
Review Comment:
```suggestion
// Sdfs will always encode restrictions as KV<restriction, watermark
state | bool(false)>
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]