[ 
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=762601&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762601
 ]

ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Apr/22 22:25
            Start Date: 26/Apr/22 22:25
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r859184689


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -168,7 +168,9 @@ const (
        restrictionSizeName          = "RestrictionSize"
        createTrackerName            = "CreateTracker"
 
-       createWatermarkEstimatorName = "CreateWatermarkEstimator"
+       createWatermarkEstimatorName          = "CreateWatermarkEstimator"
+       getInitialWatermarkEstimatorStateName = 
"GetInitialWatermarkEstimatorState"
+       getWatermarkEstimatorStateName        = "GetWatermarkEstimatorState"

Review Comment:
   Drop the "Get" prefixes. Idiomatic Go doesn't prefix field fetching methods 
with "Get" unnecessarily.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -321,27 +321,38 @@ func newCreateWatermarkEstimatorInvoker(fn *funcx.Fn) 
(*cweInvoker, error) {
 
 func (n *cweInvoker) initCallFn() error {
        // Expects a signature of the form:
-       // () sdf.WatermarkEstimator
+       // (watermarkState?) sdf.WatermarkEstimator
        switch fnT := n.fn.Fn.(type) {
        case reflectx.Func0x1:
-               n.call = func() sdf.WatermarkEstimator {
+               n.call = func(rest interface{}) sdf.WatermarkEstimator {
                        return fnT.Call0x1().(sdf.WatermarkEstimator)
                }
+       case reflectx.Func1x1:
+               n.call = func(rest interface{}) sdf.WatermarkEstimator {
+                       return fnT.Call1x1(rest).(sdf.WatermarkEstimator)
+               }
        default:
-               if len(n.fn.Param) != 0 {
+               switch len(n.fn.Param) {
+               case 0:
+                       n.call = func(rest interface{}) sdf.WatermarkEstimator {
+                               return 
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+                       }
+               case 1:
+                       n.call = func(rest interface{}) sdf.WatermarkEstimator {
+                               n.args[0] = rest
+                               return 
n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
+                       }
+               default:
                        return errors.Errorf("CreateWatermarkEstimator fn %v 
has unexpected number of parameters: %v",
                                n.fn.Fn.Name(), len(n.fn.Param))
                }
-               n.call = func() sdf.WatermarkEstimator {
-                       return n.fn.Fn.Call(n.args)[0].(sdf.WatermarkEstimator)
-               }
        }
        return nil
 }
 
-// Invoke calls CreateWatermarkEstimator given a restriction and returns an 
sdf.WatermarkEstimator.
-func (n *cweInvoker) Invoke() sdf.WatermarkEstimator {
-       return n.call()
+// Invoke calls CreateWatermarkEstimator given a restriction and returns an 
sdf.RTracker.

Review Comment:
   Either the comment is wrong, or the return type was wrong.
   
   ```suggestion
   // Invoke calls CreateWatermarkEstimator given a restriction and returns an 
sdf.WatermarkEstimator.
   ```



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -324,6 +330,27 @@ func (f *SplittableDoFn) WatermarkEstimatorT() 
reflect.Type {
        return f.CreateWatermarkEstimatorFn().Ret[0].T
 }
 
+// IsWatermarkEstimating returns whether the DoFn implements custom watermark 
state.

Review Comment:
   Name typo.
   ```suggestion
   // IsStatefulWatermarkEstimating returns whether the DoFn implements custom 
watermark state.
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -492,7 +537,7 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f 
float64) ([]*FullValue, []
 // behavior is identical). A single restriction split will occur and all 
windows
 // present in the unsplit element will be present in both the resulting primary
 // and residual.
-func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64) 
([]*FullValue, []*FullValue, error) {
+func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64, 
pWeState interface{}, rWeState interface{}) ([]*FullValue, []*FullValue, error) 
{

Review Comment:
   Nit here and similar methods below: You can avoid repeating the type for 
each input parameter of an identical type. eg `a int, b int, c int, d int` => 
`a, b, c, d int` 
   
   This also works for identical fields in a struct declaration.
   
   ```suggestion
   func (n *ProcessSizedElementsAndRestrictions) singleWindowSplit(f float64, 
pWeState, rWeState interface{}) ([]*FullValue, []*FullValue, error) {
   ```



##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
        }
 
        var rc *coder.Coder
+       // Sdfs will always encode restrictions as KV<restriction, watermark 
state | nil>

Review Comment:
   ```suggestion
        // Sdfs will always encode restrictions as KV<restriction, watermark 
state | bool(false)>
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 762601)
    Time Spent: 5h 40m  (was: 5.5h)

> [Go SDK] Watermark Estimation
> -----------------------------
>
>                 Key: BEAM-11105
>                 URL: https://issues.apache.org/jira/browse/BEAM-11105
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Danny McCormick
>            Priority: P3
>          Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to