[ 
https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=762711&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762711
 ]

ASF GitHub Bot logged work on BEAM-14347:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Apr/22 05:14
            Start Date: 27/Apr/22 05:14
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17429:
URL: https://github.com/apache/beam/pull/17429#discussion_r859383238


##########
sdks/go/cmd/specialize/main.go:
##########
@@ -238,3 +246,133 @@ func upto(i int) []int {
        }
        return ret
 }
+
+func add(i int, j int) int {
+       return i + j
+}
+
+func mult(i int, j int) int {
+       return i * j
+}
+
+func dict(values ...interface{}) map[string]interface{} {
+       dict := make(map[string]interface{}, len(values)/2)
+       if len(values)%2 != 0 {
+               panic("Invalid dictionary call")
+       }
+       for i := 0; i < len(values); i += 2 {
+               dict[values[i].(string)] = values[i+1]
+       }
+
+       return dict
+}
+
+func list(values ...string) []string {
+       return values
+}
+
+func genericTypingRepresentation(in int, out int, includeType bool) string {
+       seenElements := false
+       typing := ""
+       if in > 0 {
+               typing += fmt.Sprintf("[I%v", 0)
+               for i := 1; i < in; i++ {
+                       typing += fmt.Sprintf(", I%v", i)
+               }
+               seenElements = true
+       }
+       if out > 0 {
+               i := 0
+               if !seenElements {
+                       typing += fmt.Sprintf("[R%v", 0)
+                       i++
+               }
+               for i < out {
+                       typing += fmt.Sprintf(", R%v", i)
+                       i++
+               }
+               seenElements = true
+       }
+
+       if seenElements {
+               if includeType {
+                       typing += " any"
+               }
+               typing += "]"
+       }
+
+       return typing
+}
+
+func possibleBundleLifecycleParameterCombos(numInInterface interface{}, 
processElementInInterface interface{}) [][]string {
+       numIn := numInInterface.(int)
+       processElementIn := processElementInInterface.(int)
+       ordered_known_parameter_options := []string{"context.Context", 
"typex.PaneInfo", "[]typex.Window", "typex.EventTime", 
"typex.BundleFinalization"}
+       // Because of how Bundle lifecycle functions are invoked, all known 
parameters must preced unknown options and be in order.
+       // Once we hit an unknown options, all remaining unknown options must 
be included since all iters/emitters must be included
+       // Therefore, we can generate a powerset of the known options and fill 
out any remaining parameters with an ordered set of remaining unknown options
+       pSetSize := int(math.Pow(2, 
float64(len(ordered_known_parameter_options))))
+       combos := make([][]string, 0, pSetSize)
+
+       var index int
+       for index < pSetSize {

Review Comment:
   There doesn't seem to be any reason not to use a standard `for index := 0; 
index < pSetSize; index++ {` construct here.
   
   (Don't forget to remove the `index++` from the bottom of the loop.)



##########
sdks/go/pkg/beam/registration.tmpl:
##########
@@ -0,0 +1,292 @@
+{{define "StructWrappersAndFuncs_StartFinishBundle"}}

Review Comment:
   I strongly recommend we have the registration methods in *different* package 
than the main user surface beam package. It would make the package doc much 
harder to use.  
   
   If the package is called `register` then the methods become 
`register.DoFnNxMxO` etc. which read pretty cleanly. This also avoids forcing 
generics into the main beam package.



##########
sdks/go/pkg/beam/registration.tmpl:
##########
@@ -0,0 +1,292 @@
+{{define "StructWrappersAndFuncs_StartFinishBundle"}}
+{{$funcName := "unknown"}}{{$structName := "unknown"}}{{if (eq .func 
"startBundle")}}{{$funcName = "startBundle"}}{{$structName = 
"StartBundle"}}{{end}}{{if (eq .func "finishBundle")}}{{$funcName = 
"finishBundle"}}{{$structName = "FinishBundle"}}{{end}}
+       {{$funcName}}In := -1
+       {{$funcName}}Out := -1
+    var {{$funcName}}Wrapper func(fn interface{}) reflectx.Func
+       {{$funcName}}Method := 
reflect.ValueOf(doFn).MethodByName("{{$structName}}")
+       if {{$funcName}}Method.IsValid() {
+               {{$funcName}}In = {{$funcName}}Method.Type().NumIn()
+               {{$funcName}}Out = {{$funcName}}Method.Type().NumOut()
+           switch {
+{{range $funcIn := upto 8}}
+    case {{$funcName}}In == {{$funcIn}}:
+            switch { {{range $funcOut := upto 2}}{{$possibleCombos := 
(possibleBundleLifecycleParameterCombos $funcIn $.processElementIn)}}{{if 
$possibleCombos}}
+            case {{$funcName}}Out == {{$funcOut}}:
+{{$first := true}}{{range $funcCombo := $possibleCombos}}{{if $first}}{{$first 
= false}}                {{else}} else {{end}}if _, ok := 
doFn.({{$funcName}}{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join 
$funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}); 
ok {
+                    {{$funcName}}Caller := func(fn interface{}) reflectx.Func {
+                        f := fn.(func({{(join $funcCombo ", ")}}){{if 
$funcOut}} error{{end}})
+                        return &caller{{$funcIn}}x{{$funcOut}}{{if (or $funcIn 
$funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, 
{{end}}error{{end}}]{{end}}{fn: f}
+                    }
+                    reflectx.RegisterFunc(reflect.TypeOf((*func({{(join 
$funcCombo ", ")}}){{if $funcOut}} error{{end}})(nil)).Elem(), 
{{$funcName}}Caller)
+                    {{$funcRegister := (makeStructRegisterEntry $funcName 
$structName $funcCombo (list))}}{{if $funcOut}}{{$funcRegister = 
(makeStructRegisterEntry $funcName $structName $funcCombo (list 
"error"))}}{{end}}
+                    {{$funcName}}Wrapper = func(fn interface{}) reflectx.Func {
+                        return {{$funcRegister}}
+                    }
+                } {{end}}{{end}}{{end}}
+            default:
+                panic("Invalid signature for {{$structName}}")
+            }
+{{end}}
+        default:
+            panic("Invalid signature for {{$structName}}")
+        }
+    }
+{{end}}
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated from registration.tmpl. DO NOT EDIT.
+
+package beam
+
+import (
+       "context"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+)
+{{$processElementMaxOut := 5}}{{$processElementMaxIn := 
9}}{{$startFinishBundleOutRange := 2}}{{$startFinishBundleInRange := 8}}{{range 
$processElementOut := upto $processElementMaxOut}}{{range $processElementIn := 
upto $processElementMaxIn}}
+type 
doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation 
$processElementIn $processElementOut true)}} interface {
+    ProcessElement({{range $in := upto $processElementIn}}{{if $in}}, 
{{end}}i{{$in}} I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto 
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}}
+}
+
+type 
caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut true)}} struct {
+    fn func({{range $in := upto $processElementIn}}{{if $in}}, 
{{end}}I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto 
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}}
+}
+
+func (c 
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}) Name() string {
+       return reflectx.FunctionName(c.fn)
+}
+
+func (c 
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}) Type() reflect.Type {
+       return reflect.TypeOf(c.fn)
+}
+
+func (c 
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}) Call(args []interface{}) 
[]interface{} {
+    {{if $processElementOut}}{{range $out := upto $processElementOut}}{{if 
$out}}, {{end}}out{{$out}}{{end}} := {{end}}c.fn({{range $in := upto 
$processElementIn}}{{if $in}}, {{end}}args[{{$in}}].(I{{$in}}){{end}})
+       return []interface{}{ {{if $processElementOut}}{{range $out := upto 
$processElementOut}}{{if $out}}, {{end}}out{{$out}}{{end}}{{end}} }
+}
+
+func (c 
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}) 
Call{{$processElementIn}}x{{$processElementOut}}({{range $in := upto 
$processElementIn}}{{if $in}}, {{end}}arg{{$in}} I{{$in}}{{end}}){{if 
$processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}}, 
{{end}}interface{}{{end}}){{end}} {
+    {{if $processElementOut}}return {{end}}c.fn({{range $in := upto 
$processElementIn}}{{if $in}}, {{end}}arg{{$in}}{{end}})
+}
+
+func 
registerDoFn{{$processElementIn}}x{{$processElementOut}}StructWrappersAndFuncs{{(genericTypingRepresentation
 $processElementIn $processElementOut true)}}(doFn 
doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation 
$processElementIn $processElementOut false)}}) {
+    processElementCaller := func(fn interface{}) reflectx.Func {
+               f := fn.(func({{range $in := upto $processElementIn}}{{if 
$in}}, {{end}}I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto 
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}})
+               return 
&caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}{fn: f}
+       }
+       reflectx.RegisterFunc(reflect.TypeOf((*func({{range $in := upto 
$processElementIn}}{{if $in}}, {{end}}I{{$in}}{{end}}){{if $processElementOut}} 
({{range $out := upto $processElementOut}}{{if $out}}, 
{{end}}R{{$out}}{{end}}){{end}})(nil)).Elem(), processElementCaller)
+    processElementWrapper := func(fn interface{}) reflectx.Func {
+        return reflectx.MakeFunc(func({{range $in := upto 
$processElementIn}}{{if $in}}, {{end}}a{{$in}} I{{$in}}{{end}}){{if 
$processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}}, 
{{end}}R{{$out}}{{end}}){{end}} {
+            {{if $processElementOut}}return 
{{end}}fn.(doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
 $processElementIn $processElementOut false)}}).ProcessElement({{range $in := 
upto $processElementIn}}{{if $in}}, {{end}}a{{$in}}{{end}})
+        })
+    }{{template "StructWrappersAndFuncs_StartFinishBundle" (dict 
"processElementIn" $processElementIn "processElementOut" $processElementOut 
"func" "startBundle")}}{{template "StructWrappersAndFuncs_StartFinishBundle" 
(dict "processElementIn" $processElementIn "processElementOut" 
$processElementOut "func" "finishBundle")}}
+    var setupWrapper func(fn interface{}) reflectx.Func
+    if _, ok := doFn.(setup0x0); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func())
+            return &caller0x0{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), 
setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() {
+                fn.(setup0x0).Setup()
+            })
+        }
+    } else if _, ok := doFn.(setup1x0); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context))
+            return &caller1x0[context.Context]{fn: f}
+        }
+        
reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(), 
setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) {
+                fn.(setup1x0).Setup(a0)
+            })
+        }
+    } else if _, ok := doFn.(setup0x1); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func() error)
+            return &caller0x1[error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(), 
setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() error {
+                return fn.(setup0x1).Setup()
+            })
+        }
+    } else if _, ok := doFn.(setup1x1); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context) error)
+            return &caller1x1[context.Context, error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context) 
error)(nil)).Elem(), setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) error {
+                return fn.(setup1x1).Setup(a0)
+            })
+        }
+    }
+    var teardownWrapper func(fn interface{}) reflectx.Func
+    if _, ok := doFn.(teardown0x0); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func())
+            return &caller0x0{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), 
teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() {
+                fn.(teardown0x0).Teardown()
+            })
+        }
+    } else if _, ok := doFn.(teardown1x0); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context))
+            return &caller1x0[context.Context]{fn: f}
+        }
+        
reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(), 
teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) {
+                fn.(teardown1x0).Teardown(a0)
+            })
+        }
+    } else if _, ok := doFn.(teardown0x1); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func() error)
+            return &caller0x1[error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(), 
teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() error {
+                return fn.(teardown0x1).Teardown()
+            })
+        }
+    } else if _, ok := doFn.(teardown1x1); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context) error)
+            return &caller1x1[context.Context, error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context) 
error)(nil)).Elem(), teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) error {
+                return fn.(teardown1x1).Teardown(a0)
+            })
+        }
+    }
+    wrapperFn := func(fn interface{}) map[string]reflectx.Func {
+        m := map[string]reflectx.Func{}
+        if processElementWrapper != nil {
+            m["ProcessElement"] = processElementWrapper(fn)
+        }
+        if startBundleWrapper != nil {
+            m["StartBundle"] = startBundleWrapper(fn)
+        }
+        if finishBundleWrapper != nil {
+            m["FinishBundle"] = finishBundleWrapper(fn)
+        }
+        if setupWrapper != nil {
+            m["Setup"] = setupWrapper(fn)
+        }
+        if teardownWrapper != nil {
+            m["Teardown"] = teardownWrapper(fn)
+        }
+        
+        return m
+    }

Review Comment:
   This is probably something you had in mind for helping reduce the size of 
the generated code since a quick skim reads pretty static...
   
   Do recall that if a helper doesn't need arity assistance, it may be able to 
be put into a non-generated file the same package, and tested normally.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 762711)
    Time Spent: 2.5h  (was: 2h 20m)

> [Go SDK] Allow users to optimize DoFns with a single generic registration 
> function
> ----------------------------------------------------------------------------------
>
>                 Key: BEAM-14347
>                 URL: https://issues.apache.org/jira/browse/BEAM-14347
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-go
>            Reporter: Danny McCormick
>            Assignee: Danny McCormick
>            Priority: P2
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Right now, to optimize DoFn execution, users have to use the code generator. 
> This updates to allow them to use generics instead.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to