johannaojeling commented on code in PR #25437:
URL: https://github.com/apache/beam/pull/25437#discussion_r1107656944


##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -39,61 +43,45 @@ import (
 
 // cirInvoker is an invoker for CreateInitialRestriction.
 type cirInvoker struct {
-       fn   *funcx.Fn
-       args []any // Cache to avoid allocating new slices per-element.
-       call func(elms *FullValue) (rest any)
+       fn     *funcx.Fn
+       args   []any // Cache to avoid allocating new slices per-element.
+       ctxIdx int
+       call   func() (rest any, err error)
 }
 
 func newCreateInitialRestrictionInvoker(fn *funcx.Fn) (*cirInvoker, error) {
        n := &cirInvoker{
                fn:   fn,
                args: make([]any, len(fn.Param)),
        }
+
+       var ok bool
+       if n.ctxIdx, ok = fn.Context(); !ok {
+               n.ctxIdx = -1
+       }
+
        if err := n.initCallFn(); err != nil {
                return nil, errors.WithContext(err, "sdf 
CreateInitialRestriction invoker")
        }
        return n, nil
 }
 
-func (n *cirInvoker) initCallFn() error {
-       // Expects a signature of the form:
-       // (key?, value) restriction
-       // TODO(BEAM-9643): Link to full documentation.
-       switch fnT := n.fn.Fn.(type) {
-       case reflectx.Func1x1:

Review Comment:
   Thanks, hm okay, not sure if I fully understand the preferred way forward. 
Do you mean a) we should strive to handle the invocation of SDF methods with 
[exec.Invoke](https://github.com/apache/beam/blob/fd2b16139fd03d3221975ca88fa516f7ad41b6a4/sdks/go/pkg/beam/core/runtime/exec/fn.go#L73)
 and remove the individual SDF invokers, b) there is no need to check the 
reflectx.FuncMxN cases for SDF methods but they should always be invoked with 
the default case `n.fn.Fn.Call()`, or c) something else? 🙂
   
   I don't have the full picture but what I've seen so far when looking at the 
invocation of ProcessElement, is that the reflectx.FuncMxN cases match when 
there has been a generic registration of the DoFn. Currently there seems to be 
support for wrapping the standard methods Setup, StartBundle, ProcessElement, 
FinishBundle and Teardown into these types, but not for SDF methods, so for SDF 
methods the reflectx.FuncMxN cases won't match and the default case will 
happen. There could be other things going on that I'm unaware of though, so I 
don't know how valid this observation is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to