[
https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=762711&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762711
]
ASF GitHub Bot logged work on BEAM-14347:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 27/Apr/22 05:14
Start Date: 27/Apr/22 05:14
Worklog Time Spent: 10m
Work Description: lostluck commented on code in PR #17429:
URL: https://github.com/apache/beam/pull/17429#discussion_r859383238
##########
sdks/go/cmd/specialize/main.go:
##########
@@ -238,3 +246,133 @@ func upto(i int) []int {
}
return ret
}
+
+func add(i int, j int) int {
+ return i + j
+}
+
+func mult(i int, j int) int {
+ return i * j
+}
+
+func dict(values ...interface{}) map[string]interface{} {
+ dict := make(map[string]interface{}, len(values)/2)
+ if len(values)%2 != 0 {
+ panic("Invalid dictionary call")
+ }
+ for i := 0; i < len(values); i += 2 {
+ dict[values[i].(string)] = values[i+1]
+ }
+
+ return dict
+}
+
+func list(values ...string) []string {
+ return values
+}
+
+func genericTypingRepresentation(in int, out int, includeType bool) string {
+ seenElements := false
+ typing := ""
+ if in > 0 {
+ typing += fmt.Sprintf("[I%v", 0)
+ for i := 1; i < in; i++ {
+ typing += fmt.Sprintf(", I%v", i)
+ }
+ seenElements = true
+ }
+ if out > 0 {
+ i := 0
+ if !seenElements {
+ typing += fmt.Sprintf("[R%v", 0)
+ i++
+ }
+ for i < out {
+ typing += fmt.Sprintf(", R%v", i)
+ i++
+ }
+ seenElements = true
+ }
+
+ if seenElements {
+ if includeType {
+ typing += " any"
+ }
+ typing += "]"
+ }
+
+ return typing
+}
+
+func possibleBundleLifecycleParameterCombos(numInInterface interface{},
processElementInInterface interface{}) [][]string {
+ numIn := numInInterface.(int)
+ processElementIn := processElementInInterface.(int)
+ ordered_known_parameter_options := []string{"context.Context",
"typex.PaneInfo", "[]typex.Window", "typex.EventTime",
"typex.BundleFinalization"}
+ // Because of how Bundle lifecycle functions are invoked, all known
parameters must preced unknown options and be in order.
+ // Once we hit an unknown options, all remaining unknown options must
be included since all iters/emitters must be included
+ // Therefore, we can generate a powerset of the known options and fill
out any remaining parameters with an ordered set of remaining unknown options
+ pSetSize := int(math.Pow(2,
float64(len(ordered_known_parameter_options))))
+ combos := make([][]string, 0, pSetSize)
+
+ var index int
+ for index < pSetSize {
Review Comment:
There doesn't seem to be any reason not to use a standard `for index := 0;
index < pSetSize; index++ {` construct here.
(Don't forget to remove the `index++` from the bottom of the loop.)
##########
sdks/go/pkg/beam/registration.tmpl:
##########
@@ -0,0 +1,292 @@
+{{define "StructWrappersAndFuncs_StartFinishBundle"}}
Review Comment:
I strongly recommend we have the registration methods in *different* package
than the main user surface beam package. It would make the package doc much
harder to use.
If the package is called `register` then the methods become
`register.DoFnNxMxO` etc. which read pretty cleanly. This also avoids forcing
generics into the main beam package.
##########
sdks/go/pkg/beam/registration.tmpl:
##########
@@ -0,0 +1,292 @@
+{{define "StructWrappersAndFuncs_StartFinishBundle"}}
+{{$funcName := "unknown"}}{{$structName := "unknown"}}{{if (eq .func
"startBundle")}}{{$funcName = "startBundle"}}{{$structName =
"StartBundle"}}{{end}}{{if (eq .func "finishBundle")}}{{$funcName =
"finishBundle"}}{{$structName = "FinishBundle"}}{{end}}
+ {{$funcName}}In := -1
+ {{$funcName}}Out := -1
+ var {{$funcName}}Wrapper func(fn interface{}) reflectx.Func
+ {{$funcName}}Method :=
reflect.ValueOf(doFn).MethodByName("{{$structName}}")
+ if {{$funcName}}Method.IsValid() {
+ {{$funcName}}In = {{$funcName}}Method.Type().NumIn()
+ {{$funcName}}Out = {{$funcName}}Method.Type().NumOut()
+ switch {
+{{range $funcIn := upto 8}}
+ case {{$funcName}}In == {{$funcIn}}:
+ switch { {{range $funcOut := upto 2}}{{$possibleCombos :=
(possibleBundleLifecycleParameterCombos $funcIn $.processElementIn)}}{{if
$possibleCombos}}
+ case {{$funcName}}Out == {{$funcOut}}:
+{{$first := true}}{{range $funcCombo := $possibleCombos}}{{if $first}}{{$first
= false}} {{else}} else {{end}}if _, ok :=
doFn.({{$funcName}}{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join
$funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}});
ok {
+ {{$funcName}}Caller := func(fn interface{}) reflectx.Func {
+ f := fn.(func({{(join $funcCombo ", ")}}){{if
$funcOut}} error{{end}})
+ return &caller{{$funcIn}}x{{$funcOut}}{{if (or $funcIn
$funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}},
{{end}}error{{end}}]{{end}}{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func({{(join
$funcCombo ", ")}}){{if $funcOut}} error{{end}})(nil)).Elem(),
{{$funcName}}Caller)
+ {{$funcRegister := (makeStructRegisterEntry $funcName
$structName $funcCombo (list))}}{{if $funcOut}}{{$funcRegister =
(makeStructRegisterEntry $funcName $structName $funcCombo (list
"error"))}}{{end}}
+ {{$funcName}}Wrapper = func(fn interface{}) reflectx.Func {
+ return {{$funcRegister}}
+ }
+ } {{end}}{{end}}{{end}}
+ default:
+ panic("Invalid signature for {{$structName}}")
+ }
+{{end}}
+ default:
+ panic("Invalid signature for {{$structName}}")
+ }
+ }
+{{end}}
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated from registration.tmpl. DO NOT EDIT.
+
+package beam
+
+import (
+ "context"
+ "reflect"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+)
+{{$processElementMaxOut := 5}}{{$processElementMaxIn :=
9}}{{$startFinishBundleOutRange := 2}}{{$startFinishBundleInRange := 8}}{{range
$processElementOut := upto $processElementMaxOut}}{{range $processElementIn :=
upto $processElementMaxIn}}
+type
doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut true)}} interface {
+ ProcessElement({{range $in := upto $processElementIn}}{{if $in}},
{{end}}i{{$in}} I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}}
+}
+
+type
caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut true)}} struct {
+ fn func({{range $in := upto $processElementIn}}{{if $in}},
{{end}}I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}}
+}
+
+func (c
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}) Name() string {
+ return reflectx.FunctionName(c.fn)
+}
+
+func (c
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}) Type() reflect.Type {
+ return reflect.TypeOf(c.fn)
+}
+
+func (c
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}) Call(args []interface{})
[]interface{} {
+ {{if $processElementOut}}{{range $out := upto $processElementOut}}{{if
$out}}, {{end}}out{{$out}}{{end}} := {{end}}c.fn({{range $in := upto
$processElementIn}}{{if $in}}, {{end}}args[{{$in}}].(I{{$in}}){{end}})
+ return []interface{}{ {{if $processElementOut}}{{range $out := upto
$processElementOut}}{{if $out}}, {{end}}out{{$out}}{{end}}{{end}} }
+}
+
+func (c
*caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}})
Call{{$processElementIn}}x{{$processElementOut}}({{range $in := upto
$processElementIn}}{{if $in}}, {{end}}arg{{$in}} I{{$in}}{{end}}){{if
$processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}},
{{end}}interface{}{{end}}){{end}} {
+ {{if $processElementOut}}return {{end}}c.fn({{range $in := upto
$processElementIn}}{{if $in}}, {{end}}arg{{$in}}{{end}})
+}
+
+func
registerDoFn{{$processElementIn}}x{{$processElementOut}}StructWrappersAndFuncs{{(genericTypingRepresentation
$processElementIn $processElementOut true)}}(doFn
doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}) {
+ processElementCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func({{range $in := upto $processElementIn}}{{if
$in}}, {{end}}I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto
$processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}})
+ return
&caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func({{range $in := upto
$processElementIn}}{{if $in}}, {{end}}I{{$in}}{{end}}){{if $processElementOut}}
({{range $out := upto $processElementOut}}{{if $out}},
{{end}}R{{$out}}{{end}}){{end}})(nil)).Elem(), processElementCaller)
+ processElementWrapper := func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func({{range $in := upto
$processElementIn}}{{if $in}}, {{end}}a{{$in}} I{{$in}}{{end}}){{if
$processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}},
{{end}}R{{$out}}{{end}}){{end}} {
+ {{if $processElementOut}}return
{{end}}fn.(doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation
$processElementIn $processElementOut false)}}).ProcessElement({{range $in :=
upto $processElementIn}}{{if $in}}, {{end}}a{{$in}}{{end}})
+ })
+ }{{template "StructWrappersAndFuncs_StartFinishBundle" (dict
"processElementIn" $processElementIn "processElementOut" $processElementOut
"func" "startBundle")}}{{template "StructWrappersAndFuncs_StartFinishBundle"
(dict "processElementIn" $processElementIn "processElementOut"
$processElementOut "func" "finishBundle")}}
+ var setupWrapper func(fn interface{}) reflectx.Func
+ if _, ok := doFn.(setup0x0); ok {
+ setupCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func())
+ return &caller0x0{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(),
setupCaller)
+
+ setupWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func() {
+ fn.(setup0x0).Setup()
+ })
+ }
+ } else if _, ok := doFn.(setup1x0); ok {
+ setupCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(context.Context))
+ return &caller1x0[context.Context]{fn: f}
+ }
+
reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(),
setupCaller)
+
+ setupWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 context.Context) {
+ fn.(setup1x0).Setup(a0)
+ })
+ }
+ } else if _, ok := doFn.(setup0x1); ok {
+ setupCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func() error)
+ return &caller0x1[error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(),
setupCaller)
+
+ setupWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func() error {
+ return fn.(setup0x1).Setup()
+ })
+ }
+ } else if _, ok := doFn.(setup1x1); ok {
+ setupCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(context.Context) error)
+ return &caller1x1[context.Context, error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context)
error)(nil)).Elem(), setupCaller)
+
+ setupWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 context.Context) error {
+ return fn.(setup1x1).Setup(a0)
+ })
+ }
+ }
+ var teardownWrapper func(fn interface{}) reflectx.Func
+ if _, ok := doFn.(teardown0x0); ok {
+ teardownCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func())
+ return &caller0x0{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(),
teardownCaller)
+
+ teardownWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func() {
+ fn.(teardown0x0).Teardown()
+ })
+ }
+ } else if _, ok := doFn.(teardown1x0); ok {
+ teardownCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(context.Context))
+ return &caller1x0[context.Context]{fn: f}
+ }
+
reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(),
teardownCaller)
+
+ teardownWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 context.Context) {
+ fn.(teardown1x0).Teardown(a0)
+ })
+ }
+ } else if _, ok := doFn.(teardown0x1); ok {
+ teardownCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func() error)
+ return &caller0x1[error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(),
teardownCaller)
+
+ teardownWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func() error {
+ return fn.(teardown0x1).Teardown()
+ })
+ }
+ } else if _, ok := doFn.(teardown1x1); ok {
+ teardownCaller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(context.Context) error)
+ return &caller1x1[context.Context, error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context)
error)(nil)).Elem(), teardownCaller)
+
+ teardownWrapper = func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 context.Context) error {
+ return fn.(teardown1x1).Teardown(a0)
+ })
+ }
+ }
+ wrapperFn := func(fn interface{}) map[string]reflectx.Func {
+ m := map[string]reflectx.Func{}
+ if processElementWrapper != nil {
+ m["ProcessElement"] = processElementWrapper(fn)
+ }
+ if startBundleWrapper != nil {
+ m["StartBundle"] = startBundleWrapper(fn)
+ }
+ if finishBundleWrapper != nil {
+ m["FinishBundle"] = finishBundleWrapper(fn)
+ }
+ if setupWrapper != nil {
+ m["Setup"] = setupWrapper(fn)
+ }
+ if teardownWrapper != nil {
+ m["Teardown"] = teardownWrapper(fn)
+ }
+
+ return m
+ }
Review Comment:
This is probably something you had in mind for helping reduce the size of
the generated code since a quick skim reads pretty static...
Do recall that if a helper doesn't need arity assistance, it may be able to
be put into a non-generated file the same package, and tested normally.
Issue Time Tracking
-------------------
Worklog Id: (was: 762711)
Time Spent: 2.5h (was: 2h 20m)
> [Go SDK] Allow users to optimize DoFns with a single generic registration
> function
> ----------------------------------------------------------------------------------
>
> Key: BEAM-14347
> URL: https://issues.apache.org/jira/browse/BEAM-14347
> Project: Beam
> Issue Type: New Feature
> Components: sdk-go
> Reporter: Danny McCormick
> Assignee: Danny McCormick
> Priority: P2
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> Right now, to optimize DoFn execution, users have to use the code generator.
> This updates to allow them to use generics instead.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)