damccorm commented on code in PR #17579:
URL: https://github.com/apache/beam/pull/17579#discussion_r868223691


##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -241,13 +242,320 @@ type teardown1x1 interface {
        Teardown(ctx context.Context) error
 }
 
+type createAccumulator0x1[T any] interface {
+    CreateAccumulator() T
+}
+
+type createAccumulator0x2[T any] interface {
+    CreateAccumulator() (T, error)
+}
+
+type addInput2x1[T1, T2 any] interface {
+    AddInput(a T1, i T2) T1
+}
+
+type addInput2x2[T1, T2 any] interface {
+    AddInput(a T1, i T2) (T1, error)
+}
+
+type mergeAccumulators2x1[T any] interface {
+    MergeAccumulators(a0 T, a1 T) T
+}
+
+type mergeAccumulators2x2[T any] interface {
+    MergeAccumulators(a0 T, a1 T) (T, error)
+}
+
+type extractOutput1x1[T1, T2 any] interface {
+    ExtractOutput(a T1) T2
+}
+
+type extractOutput1x2[T1, T2 any] interface {
+    ExtractOutput(a T1) (T2, error)
+}
+
+{{range $accum := upto 3}}{{$genericParams := (add $accum 1)}}
+// Accumulator{{$genericParams}} registers an accumulator (CombineFn) DoFn's 
structural functions
+// and types and optimizes their runtime execution. There are 3 different 
Accumulator
+// functions, each of which should be used for a different situation.
+{{if (eq $genericParams 1)}}// Accumulator1 should be used when your 
accumulator, input, and output are all of the same type.
+// It can be called with register.Accumulator1[T](&CustomAccumulator{})
+// where T is the type of the input/accumulator/output.
+{{else}}{{if (eq $genericParams 2)}}// Accumulator2 should be used when your 
accumulator, input, and output are 2 distinct types.
+// It can be called with register.Accumulator2[T1, T2](&CustomAccumulator{})
+// where T1 is the type of the accumulator and T2 is the other type.
+{{else}}// Accumulator3 should be used when your accumulator, input, and 
output are 3 distinct types.
+// It can be called with register.Accumulator3[T1, T2, 
T3](&CustomAccumulator{})
+// where T1 is the type of the accumulator, T2 is the type of the input, and 
T3 is the type of the output.
+{{end}}{{end}}func Accumulator{{$genericParams}}[{{range $paramNum := upto 
$genericParams}}{{if $paramNum}}, {{end}}T{{$paramNum}}{{end}} any](accum 
interface{}) {
+    registerAccumulatorTypes(accum)
+    accumVal := reflect.ValueOf(accum)
+    var mergeAccumulatorsWrapper func(fn interface{}) reflectx.Func
+    if _, ok := accum.(mergeAccumulators2x2[T0]); ok {
+        caller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(T0, T0) (T0, error))
+            return &caller2x2[T0, T0, T0, error]{fn: f}
+        }
+               reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) (T0, 
error))(nil)).Elem(), caller)
+
+        mergeAccumulatorsWrapper = func(fn interface{}) reflectx.Func {
+                       return reflectx.MakeFunc(func(a0 T0, a1 T0) (T0, error) 
{
+                               return 
fn.(mergeAccumulators2x2[T0]).MergeAccumulators(a0, a1)
+                       })
+               }
+    } else if _, ok := accum.(mergeAccumulators2x1[T0]); ok {
+        caller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(T0, T0) T0)
+            return &caller2x1[T0, T0, T0]{fn: f}
+        }
+               reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) 
T0)(nil)).Elem(), caller)
+
+        mergeAccumulatorsWrapper = func(fn interface{}) reflectx.Func {
+                       return reflectx.MakeFunc(func(a0 T0, a1 T0) T0 {
+                               return 
fn.(mergeAccumulators2x1[T0]).MergeAccumulators(a0, a1)
+                       })
+               }
+    }
+
+    if mergeAccumulatorsWrapper == nil{

Review Comment:
   Done!



##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -241,13 +242,320 @@ type teardown1x1 interface {
        Teardown(ctx context.Context) error
 }
 
+type createAccumulator0x1[T any] interface {
+    CreateAccumulator() T
+}
+
+type createAccumulator0x2[T any] interface {
+    CreateAccumulator() (T, error)
+}
+
+type addInput2x1[T1, T2 any] interface {
+    AddInput(a T1, i T2) T1
+}
+
+type addInput2x2[T1, T2 any] interface {
+    AddInput(a T1, i T2) (T1, error)
+}
+
+type mergeAccumulators2x1[T any] interface {
+    MergeAccumulators(a0 T, a1 T) T
+}
+
+type mergeAccumulators2x2[T any] interface {
+    MergeAccumulators(a0 T, a1 T) (T, error)
+}
+
+type extractOutput1x1[T1, T2 any] interface {
+    ExtractOutput(a T1) T2
+}
+
+type extractOutput1x2[T1, T2 any] interface {
+    ExtractOutput(a T1) (T2, error)
+}
+
+{{range $accum := upto 3}}{{$genericParams := (add $accum 1)}}
+// Accumulator{{$genericParams}} registers an accumulator (CombineFn) DoFn's 
structural functions

Review Comment:
   Yeah, that makes sense - updated!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to