lostluck commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r863119527


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -199,6 +201,18 @@ var requiredSdfNames = []string{
        createTrackerName,
 }
 
+var optionalSdfNames = []string{
+       truncateRestrictionName,
+}
+
+var sdfNames = []string{

Review Comment:
   Consider not needing to re-specify explicitly, so we don't end up with 
additional places to update: eg https://go.dev/play/p/YAa2Xtx5aba



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,88 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()

Review Comment:
   This isn't used AFAICT, please delete it.



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -847,6 +876,7 @@ func validateSdfSigNumbers(fn *Fn, num int) error {
 func validateSdfSigTypes(fn *Fn, num int) error {
        restrictionT := fn.methods[createInitialRestrictionName].Ret[0].T
        rTrackerT := reflect.TypeOf((*sdf.RTracker)(nil)).Elem()
+       bRTrackerT := fn.methods[createTrackerName].Ret[0].T

Review Comment:
   Nits:
   Please move this closer to it's use , right before the for loop that it's 
using.
   
   Consider a more descriptive short name. "b" as a prefix doesn't denote 
anything here, a better name for this would be `rTrackerImplT` since 
`rTrackerT` represents the interface type.
   



##########
sdks/go/pkg/beam/core/sdf/lock.go:
##########
@@ -80,3 +80,73 @@ func (rt *LockRTracker) GetRestriction() interface{} {
        defer rt.Mu.Unlock()
        return rt.Rt.GetRestriction()
 }
+
+// NewLockBoundableRTracker creates a LockBoundableRTracker initialized with 
the specified
+// restriction tracker as its underlying restriction tracker.
+func NewLockBoundableRTracker(rt BoundableRTracker) *LockRTracker {
+       return &LockRTracker{Rt: rt}
+}
+
+// LockBoundableRTracker is a restriction tracker that wraps another 
restriction
+// tracker and adds thread safety to it by locking a mutex in each method,
+// before delegating to the underlying tracker.
+type LockBoundableRTracker struct {

Review Comment:
   Is it necessary to have a new type, and duplicating all the methods, instead 
of adding to the existing RTracker?
   
   Assume as default, if there is no IsBounded method (determinable with a type 
assertion), that the restriction is bounded, like we would if the 
restriction/tracker didn't have the method.
   
   Type assertions are fast, and we can always delegate to the underlying 
tracker when available in the one method.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,88 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       if tracker, ok := restTracker.(sdf.BoundableRTracker); ok && 
tracker.IsBounded() {
+               tracker.GetRestriction()

Review Comment:
   Shouldn't this be 
   ```suggestion
                return tracker.GetRestriction()
   ```
   ?
   
   Also, shouldn't that be for `!tracker.IsBounded()` so we're passing 
unbounded restrictions out without change, not for bounded restrictions?
   
   Either way, please add a unit test for the default behavior for both bounded 
and unbounded and ensure we have the correct behavior for each.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,88 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{

Review Comment:
   This function isn't used outside of the exec package, please unexport it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to