lostluck commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r859184689


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -168,7 +168,9 @@ const (
        restrictionSizeName          = "RestrictionSize"
        createTrackerName            = "CreateTracker"
 
-       createWatermarkEstimatorName = "CreateWatermarkEstimator"
+       createWatermarkEstimatorName          = "CreateWatermarkEstimator"
+       getInitialWatermarkEstimatorStateName = 
"GetInitialWatermarkEstimatorState"
+       getWatermarkEstimatorStateName        = "GetWatermarkEstimatorState"

Review Comment:
   Drop the "Get" prefixes. Idiomatic Go doesn't prefix field fetching methods 
with "Get" unnecessarily.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -321,27 +321,38 @@ func newCreateWatermarkEstimatorInvoker(fn *funcx.Fn) 
(*cweInvoker, error) {
 
 func (n *cweInvoker) initCallFn() error {
        // Expects a signature of the form:
-       // () sdf.WatermarkEstimator
+       // (watermarkState?) sdf.WatermarkEstimator
        switch fnT := n.fn.Fn.(type) {
        case reflectx.Func0x1:
-               n.call = func() sdf.WatermarkEstimator {
+               n.call = func(rest interface{}) sdf.WatermarkEstimator {
                        return fnT.Call0x1().(sdf.WatermarkEstimator)
                }
+       case reflectx.Func1x1:
+               n.call = func(rest interface{}) sdf.WatermarkEstimator {
+                       return fnT.Call1x1(rest).(sdf.WatermarkEstimator)
+               }
        default:
-               if len(n.fn.Param) != 0 {
+               switch len(n.fn.Param) {
+               case 0:
+                       n.call = func(rest interface{}) sdf.WatermarkEstimator {
+                               return 
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+                       }
+               case 1:
+                       n.call = func(rest interface{}) sdf.WatermarkEstimator {
+                               n.args[0] = rest
+                               return 
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+                       }
+               default:
                        return errors.Errorf("CreateWatermarkEstimator fn %v 
has unexpected number of parameters: %v",
                                n.fn.Fn.Name(), len(n.fn.Param))
                }
-               n.call = func() sdf.WatermarkEstimator {
-                       return n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
-               }
        }
        return nil
 }
 
-// Invoke calls CreateWatermarkEstimator given a restriction and returns an 
sdf.WatermarkEstimator.
-func (n *cweInvoker) Invoke() sdf.WatermarkEstimator {
-       return n.call()
+// Invoke calls CreateWatermarkEstimator given a restriction and returns an 
sdf.RTracker.

Review Comment:
   Either the comment is wrong, or the return type was wrong.
   
   ```suggestion
   // Invoke calls CreateWatermarkEstimator given a restriction and returns an 
sdf.WatermarkEstimator.
   ```



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -324,6 +330,27 @@ func (f *SplittableDoFn) WatermarkEstimatorT() 
reflect.Type {
        return f.CreateWatermarkEstimatorFn().Ret[0].T
 }
 
+// IsWatermarkEstimating returns whether the DoFn implements custom watermark 
state.

Review Comment:
   Name typo.
   ```suggestion
   // IsStatefulWatermarkEstimating returns whether the DoFn implements custom 
watermark state.
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -492,7 +537,7 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f 
float64) ([]*FullValue, []
 // behavior is identical). A single restriction split will occur and all 
windows
 // present in the unsplit element will be present in both the resulting primary
 // and residual.
-func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64) 
([]*FullValue, []*FullValue, error) {
+func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64, 
pWeState interface{}, rWeState interface{}) ([]*FullValue, []*FullValue, error) 
{

Review Comment:
   Nit here and similar methods below: You can avoid repeating the type for 
each input parameter of an identical type. eg `a int, b int, c int, d int` => 
`a, b, c, d int` 
   
   This also works for identical fields in a struct declaration.
   
   ```suggestion
   func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64, 
pWeState, rWeState interface{}) ([]*FullValue, []*FullValue, error) {
   ```



##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
        }
 
        var rc *coder.Coder
+       // Sdfs will always encode restrictions as KV<restriction, watermark 
state | nil>

Review Comment:
   ```suggestion
        // Sdfs will always encode restrictions as KV<restriction, watermark 
state | bool(false)>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to