damccorm commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r859854542


##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,97 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       tracker, ok := restTracker.(sdf.BoundableRTracker)
+       if !ok {
+               return nil
+       }
+       switch tracker.(type) {
+       case *offsetrange.Tracker:
+               if tracker.(*offsetrange.Tracker).IsBounded() {
+                       return 
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction)
+               }
+       default:
+               return nil
+       }
+       return nil

Review Comment:
   ```suggestion
        if tracker, ok := restTracker.(sdf.BoundableRTracker); ok && 
tracker.IsBounded() {
                return tracker.GetRestriction()
        }
        return nil
   ```
   
   Its still not clear to me why we're specifically looking at offsetrange 
trackers - can we do something like this and make this a generic mechanism? (it 
also simplifies things a great deal)



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,97 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+

Review Comment:
   ```suggestion
   
   ```
   
   I don't _think_ this is used



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -816,11 +849,17 @@ func validateSdfSigNumbers(fn *Fn, num int) error {
                splitRestrictionName:         num + 1,
                restrictionSizeName:          num + 1,
                createTrackerName:            1,
+               truncateRestrictionName:      num + 1,
        }
        returnNum := 1 // TODO(BEAM-3301): Enable optional error params in SDF 
methods.
 
-       for _, name := range requiredSdfNames {
-               method := fn.methods[name]
+       optionalSdfs := optionalSdfNameMap()
+       for _, name := range sdfNames {
+               method, ok := fn.methods[name]
+               if !ok && optionalSdfs[name] {
+                       // skip validating unimplemented optional sdf methods
+                       continue
+               }

Review Comment:
   ```suggestion
        for _, name := range requiredSdfNames {
                method, ok := fn.methods[name]
   ```
   
   Rather than using the optional map, we can just use the list of requiredSdfs 
that we already have



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -847,9 +886,15 @@ func validateSdfSigNumbers(fn *Fn, num int) error {
 func validateSdfSigTypes(fn *Fn, num int) error {
        restrictionT := fn.methods[createInitialRestrictionName].Ret[0].T
        rTrackerT := reflect.TypeOf((*sdf.RTracker)(nil)).Elem()
-
-       for _, name := range requiredSdfNames {
-               method := fn.methods[name]
+       bRTrackerT := fn.methods[createTrackerName].Ret[0].T
+       optionalSdfs := optionalSdfNameMap()
+
+       for _, name := range sdfNames {
+               method, ok := fn.methods[name]
+               if !ok && optionalSdfs[name] {
+                       // skip validating unimplemented optional sdf methodsß
+                       continue
+               }

Review Comment:
   I don't think we should be doing this kind of filtering here. If we need it, 
we can use requiredSdfNames instead of sdfNames as suggested above, but more 
importantly, we actually do want the optional sdf functions to be validated 
(that's why you've got your truncateRestrictionName case below).
   
   



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,97 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       tracker, ok := restTracker.(sdf.BoundableRTracker)
+       if !ok {
+               return nil
+       }
+       switch tracker.(type) {
+       case *offsetrange.Tracker:
+               if tracker.(*offsetrange.Tracker).IsBounded() {
+                       return 
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction)
+               }
+       default:
+               return nil
+       }
+       return nil
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+       n := &trInvoker{
+               fn:   fn,
+               args: make([]interface{}, len(fn.Param)),
+       }
+       if err := n.initCallFn(); err != nil {
+               return nil, errors.WithContext(err, "sdf TruncateRestriction 
invoker")
+       }
+       return n, nil
+}
+
+func newDefaultTruncateRestriction() (*trInvoker, error) {

Review Comment:
   Nit: this should probably be `newDefaultTruncateRestrictionInvoker` for 
consistency



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -199,6 +201,18 @@ var requiredSdfNames = []string{
        createTrackerName,
 }
 
+var optionalSdfNames = []string{

Review Comment:
   I don't think we need this list (see other comments in file)



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -920,6 +965,23 @@ func validateSdfSigTypes(fn *Fn, num int) error {
                                        "return value at index %v. Got: %v, 
Want: %v (from method %v).",
                                        createTrackerName, 0, method.Ret[0].T, 
processFn.Param[pos].T, processElementName)
                        }
+               case truncateRestrictionName:

Review Comment:
   Could you please add test case(s) in fn_test.go to cover this?



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -807,6 +832,14 @@ func validateSdfSignatures(fn *Fn, numMainIn mainInputs) 
error {
        return nil
 }
 
+func optionalSdfNameMap() map[string]bool {
+       sdfMap := make(map[string]bool)
+       for _, name := range optionalSdfNames {
+               sdfMap[name] = true
+       }
+       return sdfMap
+}

Review Comment:
   I don't think we need this (see comments on below functions)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to