lostluck commented on code in PR #17579:
URL: https://github.com/apache/beam/pull/17579#discussion_r868211497
##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -241,13 +242,320 @@ type teardown1x1 interface {
Teardown(ctx context.Context) error
}
+type createAccumulator0x1[T any] interface {
+ CreateAccumulator() T
+}
+
+type createAccumulator0x2[T any] interface {
+ CreateAccumulator() (T, error)
+}
+
+type addInput2x1[T1, T2 any] interface {
+ AddInput(a T1, i T2) T1
+}
+
+type addInput2x2[T1, T2 any] interface {
+ AddInput(a T1, i T2) (T1, error)
+}
+
+type mergeAccumulators2x1[T any] interface {
+ MergeAccumulators(a0 T, a1 T) T
+}
+
+type mergeAccumulators2x2[T any] interface {
+ MergeAccumulators(a0 T, a1 T) (T, error)
+}
+
+type extractOutput1x1[T1, T2 any] interface {
+ ExtractOutput(a T1) T2
+}
+
+type extractOutput1x2[T1, T2 any] interface {
+ ExtractOutput(a T1) (T2, error)
+}
+
+{{range $accum := upto 3}}{{$genericParams := (add $accum 1)}}
+// Accumulator{{$genericParams}} registers an accumulator (CombineFn) DoFn's
structural functions
Review Comment:
We probably want to call this Combiner1 etc.
They are called CombineFns or Combiners not accumulators. Accumulators are
the intermediate type that CombineFns operate on. See
https://beam.apache.org/documentation/programming-guide/#combine for the usual
user nomenclature.
Eg. for a mean Combinefn, one could have (some number, say `int`) as
AddInput's input type. AddInput would add it to the accumulator type
`struct{Count int, Sum float64}`, MergeAccumulators merges them together, and
ExtractOutput does the division of the Sum by the Count, to get the `float64`
output type for the Mean.
##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -241,13 +242,320 @@ type teardown1x1 interface {
Teardown(ctx context.Context) error
}
+type createAccumulator0x1[T any] interface {
+ CreateAccumulator() T
+}
+
+type createAccumulator0x2[T any] interface {
+ CreateAccumulator() (T, error)
+}
+
+type addInput2x1[T1, T2 any] interface {
+ AddInput(a T1, i T2) T1
+}
+
+type addInput2x2[T1, T2 any] interface {
+ AddInput(a T1, i T2) (T1, error)
+}
+
+type mergeAccumulators2x1[T any] interface {
+ MergeAccumulators(a0 T, a1 T) T
+}
+
+type mergeAccumulators2x2[T any] interface {
+ MergeAccumulators(a0 T, a1 T) (T, error)
+}
+
+type extractOutput1x1[T1, T2 any] interface {
+ ExtractOutput(a T1) T2
+}
+
+type extractOutput1x2[T1, T2 any] interface {
+ ExtractOutput(a T1) (T2, error)
+}
+
+{{range $accum := upto 3}}{{$genericParams := (add $accum 1)}}
+// Accumulator{{$genericParams}} registers an accumulator (CombineFn) DoFn's
structural functions
+// and types and optimizes their runtime execution. There are 3 different
Accumulator
+// functions, each of which should be used for a different situation.
+{{if (eq $genericParams 1)}}// Accumulator1 should be used when your
accumulator, input, and output are all of the same type.
+// It can be called with register.Accumulator1[T](&CustomAccumulator{})
+// where T is the type of the input/accumulator/output.
+{{else}}{{if (eq $genericParams 2)}}// Accumulator2 should be used when your
accumulator, input, and output are 2 distinct types.
+// It can be called with register.Accumulator2[T1, T2](&CustomAccumulator{})
+// where T1 is the type of the accumulator and T2 is the other type.
+{{else}}// Accumulator3 should be used when your accumulator, input, and
output are 3 distinct types.
+// It can be called with register.Accumulator3[T1, T2,
T3](&CustomAccumulator{})
+// where T1 is the type of the accumulator, T2 is the type of the input, and
T3 is the type of the output.
+{{end}}{{end}}func Accumulator{{$genericParams}}[{{range $paramNum := upto
$genericParams}}{{if $paramNum}}, {{end}}T{{$paramNum}}{{end}} any](accum
interface{}) {
+ registerAccumulatorTypes(accum)
+ accumVal := reflect.ValueOf(accum)
+ var mergeAccumulatorsWrapper func(fn interface{}) reflectx.Func
+ if _, ok := accum.(mergeAccumulators2x2[T0]); ok {
+ caller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(T0, T0) (T0, error))
+ return &caller2x2[T0, T0, T0, error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) (T0,
error))(nil)).Elem(), caller)
+
+ mergeAccumulatorsWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 T0, a1 T0) (T0, error)
{
+ return
fn.(mergeAccumulators2x2[T0]).MergeAccumulators(a0, a1)
+ })
+ }
+ } else if _, ok := accum.(mergeAccumulators2x1[T0]); ok {
+ caller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(T0, T0) T0)
+ return &caller2x1[T0, T0, T0]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0)
T0)(nil)).Elem(), caller)
+
+ mergeAccumulatorsWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 T0, a1 T0) T0 {
+ return
fn.(mergeAccumulators2x1[T0]).MergeAccumulators(a0, a1)
+ })
+ }
+ }
+
+ if mergeAccumulatorsWrapper == nil{
Review Comment:
need space after nil
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]