This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2c9344d [BEAM-7373] Migrating vet runner
new 8da20c2 Merge pull request #8632 from youngoli/beam7373
2c9344d is described below
commit 2c9344da35c867358914357dc22fd08479f05439
Author: Daniel Oliveira <[email protected]>
AuthorDate: Fri May 17 16:31:39 2019 -0700
[BEAM-7373] Migrating vet runner
This commit migrates a tool used internally at Google that
examines an inputted pipeline and outputs useful diagnostic
information to greatly improve the pipeline's performance.
This is mainly focused around making sure the user code is
properly registered and has shims generated for it, avoiding
the massive slowdown of runtime reflection.
The original author of this code is Robert Burke, with some
changes by me to get the code working here.
Co-authored-by: Robert Burke <[email protected]>
---
.../pkg/beam/runners/vet/testpipeline/functions.go | 52 ++
.../beam/runners/vet/testpipeline/testpipeline.go | 84 +++
.../runners/vet/testpipeline/testpipeline.shims.go | 190 +++++++
sdks/go/pkg/beam/runners/vet/vet.go | 596 +++++++++++++++++++++
sdks/go/pkg/beam/runners/vet/vet_test.go | 65 +++
5 files changed, 987 insertions(+)
diff --git a/sdks/go/pkg/beam/runners/vet/testpipeline/functions.go
b/sdks/go/pkg/beam/runners/vet/testpipeline/functions.go
new file mode 100644
index 0000000..ce3d3cf
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/vet/testpipeline/functions.go
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package testpipeline
+
+import "github.com/apache/beam/sdks/go/pkg/beam"
+
+//go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
+//go:generate starcgen --package=testpipeline
--identifiers=VFn,KvFn,KvEmitFn,SCombine
+//go:generate go fmt
+
+// VFn is a do nothing example function with a k and v.
+func VFn(v int) (string, int) {
+ return "key", v
+}
+
+// KvFn is a do nothing example function with a k and v.
+func KvFn(k string, v int) (string, int) {
+ return k, v
+}
+
+// KvEmitFn is a do nothing example function with a k and v that uses an emit
+// instead of a return.
+func KvEmitFn(k string, v int, emit func(string, int)) {
+ emit(k, v)
+}
+
+// SCombine is a Do Nothing structural doFn to ensure that generating things
for
+// combinefn structs works.
+type SCombine struct{}
+
+// MergeAccumulators lifecycle method.
+func (s *SCombine) MergeAccumulators(a, b int) int { return a + b }
+
+// otherMethod should never have a shim generated for it.
+// Unfortunately, outside of a manual inspection, or parsing of the
+// generated file, this is difficult to test.
+func (s *SCombine) otherMethod(v beam.V) beam.V {
+ return v
+}
diff --git a/sdks/go/pkg/beam/runners/vet/testpipeline/testpipeline.go
b/sdks/go/pkg/beam/runners/vet/testpipeline/testpipeline.go
new file mode 100644
index 0000000..3ce3c17
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/vet/testpipeline/testpipeline.go
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package testpipeline exports small test pipelines for testing the vet
+// runner. Shims must be generated for this package in order for tests to run
+// correctly. These shims should be regenerated if changes are made to this
+// package or to the shim generator.
+package testpipeline
+
+import (
+ "github.com/apache/beam/sdks/go/pkg/beam"
+)
+
+// Performant constructs a performant pipeline.
+func Performant(s beam.Scope) {
+ vs := beam.Create(s, 1, 2, 3)
+ kvs := beam.ParDo(s, VFn, vs)
+ kv1 := beam.ParDo(s, KvFn, kvs)
+ kv2 := beam.ParDo(s, KvEmitFn, kvs)
+ flatKvs := beam.Flatten(s, kv1, kv2)
+
+ beam.CombinePerKey(s, &SCombine{}, flatKvs)
+}
+
+// FunctionReg constructs a sub optimal pipeline that needs function
registration.
+func FunctionReg(s beam.Scope) {
+ vs := beam.Create(s, float64(1), float64(2), float64(3))
+ kvs := beam.ParDo(s, VFloat64Fn, vs)
+ beam.CombinePerKey(s, &SCombine{}, kvs)
+}
+
+// ShimNeeded constructs a sub optimal pipeline that needs a function shim
registration.
+func ShimNeeded(s beam.Scope) {
+ vs := beam.Create(s, float64(1), float64(2), float64(3))
+ kvs := beam.ParDo(s, vFloat64Fn, vs)
+ beam.CombinePerKey(s, &SCombine{}, kvs)
+}
+
+// TypeReg constructs a sub optimal pipeline that needs type registration.
+func TypeReg(s beam.Scope) {
+ vs := beam.Create(s, 1, 2, 3)
+ kvs := beam.ParDo(s, VFn, vs)
+
+ c := beam.CombinePerKey(s, &SCombine{}, kvs)
+ beam.ParDo(s, toFooFn, c)
+}
+
+// VFloat64Fn is an unregistered function without type shims.
+func VFloat64Fn(v float64) (string, int) {
+ return "key", 0
+}
+
+func init() {
+ beam.RegisterFunction(vFloat64Fn)
+}
+
+// vFloat64Fn is a registered function without type shims.
+func vFloat64Fn(v float64) (string, int) {
+ return "key", 0
+}
+
+// foo is an unregistered, unexported user type.
+type foo struct {
+ K string
+ V int
+}
+
+// toFooFn is an unregistered function, that uses an unregistered user type,
+// without a shim.
+func toFooFn(k string, v int) foo {
+ return foo{K: k, V: v}
+}
diff --git a/sdks/go/pkg/beam/runners/vet/testpipeline/testpipeline.shims.go
b/sdks/go/pkg/beam/runners/vet/testpipeline/testpipeline.shims.go
new file mode 100644
index 0000000..9d32d1f
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/vet/testpipeline/testpipeline.shims.go
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by starcgen. DO NOT EDIT.
+// File: testpipeline.shims.go
+
+package testpipeline
+
+import (
+ "context"
+ "reflect"
+
+ // Library imports
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+)
+
+func init() {
+ runtime.RegisterFunction(KvEmitFn)
+ runtime.RegisterFunction(KvFn)
+ runtime.RegisterFunction(VFn)
+ runtime.RegisterType(reflect.TypeOf((*SCombine)(nil)).Elem())
+ reflectx.RegisterStructWrapper(reflect.TypeOf((*SCombine)(nil)).Elem(),
wrapMakerSCombine)
+ reflectx.RegisterFunc(reflect.TypeOf((*func(int, int)
int)(nil)).Elem(), funcMakerIntIntГInt)
+ reflectx.RegisterFunc(reflect.TypeOf((*func(int) (string,
int))(nil)).Elem(), funcMakerIntГStringInt)
+ reflectx.RegisterFunc(reflect.TypeOf((*func(string, int, func(string,
int)))(nil)).Elem(), funcMakerStringIntEmitStringIntГ)
+ reflectx.RegisterFunc(reflect.TypeOf((*func(string, int) (string,
int))(nil)).Elem(), funcMakerStringIntГStringInt)
+ exec.RegisterEmitter(reflect.TypeOf((*func(string, int))(nil)).Elem(),
emitMakerStringInt)
+}
+
+func wrapMakerSCombine(fn interface{}) map[string]reflectx.Func {
+ dfn := fn.(*SCombine)
+ return map[string]reflectx.Func{
+ "MergeAccumulators": reflectx.MakeFunc(func(a0 int, a1 int) int
{ return dfn.MergeAccumulators(a0, a1) }),
+ }
+}
+
+type callerIntIntГInt struct {
+ fn func(int, int) int
+}
+
+func funcMakerIntIntГInt(fn interface{}) reflectx.Func {
+ f := fn.(func(int, int) int)
+ return &callerIntIntГInt{fn: f}
+}
+
+func (c *callerIntIntГInt) Name() string {
+ return reflectx.FunctionName(c.fn)
+}
+
+func (c *callerIntIntГInt) Type() reflect.Type {
+ return reflect.TypeOf(c.fn)
+}
+
+func (c *callerIntIntГInt) Call(args []interface{}) []interface{} {
+ out0 := c.fn(args[0].(int), args[1].(int))
+ return []interface{}{out0}
+}
+
+func (c *callerIntIntГInt) Call2x1(arg0, arg1 interface{}) interface{} {
+ return c.fn(arg0.(int), arg1.(int))
+}
+
+type callerIntГStringInt struct {
+ fn func(int) (string, int)
+}
+
+func funcMakerIntГStringInt(fn interface{}) reflectx.Func {
+ f := fn.(func(int) (string, int))
+ return &callerIntГStringInt{fn: f}
+}
+
+func (c *callerIntГStringInt) Name() string {
+ return reflectx.FunctionName(c.fn)
+}
+
+func (c *callerIntГStringInt) Type() reflect.Type {
+ return reflect.TypeOf(c.fn)
+}
+
+func (c *callerIntГStringInt) Call(args []interface{}) []interface{} {
+ out0, out1 := c.fn(args[0].(int))
+ return []interface{}{out0, out1}
+}
+
+func (c *callerIntГStringInt) Call1x2(arg0 interface{}) (interface{},
interface{}) {
+ return c.fn(arg0.(int))
+}
+
+type callerStringIntEmitStringIntГ struct {
+ fn func(string, int, func(string, int))
+}
+
+func funcMakerStringIntEmitStringIntГ(fn interface{}) reflectx.Func {
+ f := fn.(func(string, int, func(string, int)))
+ return &callerStringIntEmitStringIntГ{fn: f}
+}
+
+func (c *callerStringIntEmitStringIntГ) Name() string {
+ return reflectx.FunctionName(c.fn)
+}
+
+func (c *callerStringIntEmitStringIntГ) Type() reflect.Type {
+ return reflect.TypeOf(c.fn)
+}
+
+func (c *callerStringIntEmitStringIntГ) Call(args []interface{}) []interface{}
{
+ c.fn(args[0].(string), args[1].(int), args[2].(func(string, int)))
+ return []interface{}{}
+}
+
+func (c *callerStringIntEmitStringIntГ) Call3x0(arg0, arg1, arg2 interface{}) {
+ c.fn(arg0.(string), arg1.(int), arg2.(func(string, int)))
+}
+
+type callerStringIntГStringInt struct {
+ fn func(string, int) (string, int)
+}
+
+func funcMakerStringIntГStringInt(fn interface{}) reflectx.Func {
+ f := fn.(func(string, int) (string, int))
+ return &callerStringIntГStringInt{fn: f}
+}
+
+func (c *callerStringIntГStringInt) Name() string {
+ return reflectx.FunctionName(c.fn)
+}
+
+func (c *callerStringIntГStringInt) Type() reflect.Type {
+ return reflect.TypeOf(c.fn)
+}
+
+func (c *callerStringIntГStringInt) Call(args []interface{}) []interface{} {
+ out0, out1 := c.fn(args[0].(string), args[1].(int))
+ return []interface{}{out0, out1}
+}
+
+func (c *callerStringIntГStringInt) Call2x2(arg0, arg1 interface{})
(interface{}, interface{}) {
+ return c.fn(arg0.(string), arg1.(int))
+}
+
+type emitNative struct {
+ n exec.ElementProcessor
+ fn interface{}
+
+ ctx context.Context
+ ws []typex.Window
+ et typex.EventTime
+ value exec.FullValue
+}
+
+func (e *emitNative) Init(ctx context.Context, ws []typex.Window, et
typex.EventTime) error {
+ e.ctx = ctx
+ e.ws = ws
+ e.et = et
+ return nil
+}
+
+func (e *emitNative) Value() interface{} {
+ return e.fn
+}
+
+func emitMakerStringInt(n exec.ElementProcessor) exec.ReusableEmitter {
+ ret := &emitNative{n: n}
+ ret.fn = ret.invokeStringInt
+ return ret
+}
+
+func (e *emitNative) invokeStringInt(key string, val int) {
+ e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: key,
Elm2: val}
+ if err := e.n.ProcessElement(e.ctx, &e.value); err != nil {
+ panic(err)
+ }
+}
+
+// DO NOT MODIFY: GENERATED CODE
diff --git a/sdks/go/pkg/beam/runners/vet/vet.go
b/sdks/go/pkg/beam/runners/vet/vet.go
new file mode 100644
index 0000000..a700a80
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/vet/vet.go
@@ -0,0 +1,596 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package vet is a Beam runner that "runs" a pipeline by producing
+// generated code to avoid symbol table lookups and reflection in pipeline
+// execution.
+//
+// This runner isn't necessarily intended to be run by itself. Other runners
+// can use this as a sanity check on whether a given pipeline avoids known
+// performance bottlenecks.
+//
+// TODO(BEAM-7374): Add usage documentation.
+package vet
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "reflect"
+ "strings"
+ "unicode"
+ "unicode/utf8"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/util/shimx"
+
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+)
+
+func init() {
+ beam.RegisterRunner("vet", Execute)
+}
+
+// We want clear failures when looking up symbols so we can tell if something
has been
+// registered properly or not.
+type disabledResolver bool
+
+func (p disabledResolver) Sym2Addr(name string) (uintptr, error) {
+ return 0, fmt.Errorf("%v not found. Use runtime.RegisterFunction in
unit tests", name)
+}
+
+// Execute evaluates the pipeline on whether it can run without reflection.
+func Execute(ctx context.Context, p *beam.Pipeline) error {
+ e, err := Evaluate(ctx, p)
+ if err != nil {
+ return err
+ }
+ if !e.Performant() {
+ e.summary()
+ e.Generate("main")
+ e.diag("*/\n")
+ return fmt.Errorf("pipeline is not performant, see diagnostic
summary:\n%s\n%s", string(e.d.Bytes()), string(e.Bytes()))
+ }
+ // Pipeline nas no further tasks.
+ return nil
+}
+
+// Evaluate returns an object that can generate necessary shims and inits.
+func Evaluate(_ context.Context, p *beam.Pipeline) (*Eval, error) {
+ // Disable the resolver so we can see functions that are that are
already registered.
+ r := runtime.Resolver
+ runtime.Resolver = disabledResolver(false)
+ // Reinstate the resolver when we're through.
+ defer func() { runtime.Resolver = r }()
+
+ edges, _, err := p.Build()
+ if err != nil {
+ return nil, errors.New("can't get data to generate")
+ }
+
+ e := newEval()
+
+ e.diag("/**\n")
+ e.extractFromMultiEdges(edges)
+ return e, nil
+}
+
+func newEval() *Eval {
+ return &Eval{
+ functions: make(map[string]*funcx.Fn),
+ types: make(map[string]reflect.Type),
+ funcs: make(map[string]reflect.Type),
+ emits: make(map[string]reflect.Type),
+ iters: make(map[string]reflect.Type),
+ imports: make(map[string]struct{}),
+ allExported: true,
+ }
+}
+
+// Eval contains and uniquifies the cache of types and things that need to be
generated.
+type Eval struct {
+ // d is a buffer for the diagnostic information produced when
evaluating the pipeline.
+ // w is the primary output buffer.
+ d, w bytes.Buffer
+
+ // Register and uniquify the needed shims for each kind.
+ // Functions to Register
+ functions map[string]*funcx.Fn
+ // Types to Register (structs, essentially)
+ types map[string]reflect.Type
+ // FuncShims needed
+ funcs map[string]reflect.Type
+ // Emitter Shims needed
+ emits map[string]reflect.Type
+ // Iterator Shims needed
+ iters map[string]reflect.Type
+
+ // list of packages we need to import.
+ imports map[string]struct{}
+
+ allExported bool // Marks if all ptransforms are exported and available
in main.
+}
+
+// extractFromMultiEdges audits the given pipeline edges so we can determine if
+// this pipeline will run without reflection.
+func (e *Eval) extractFromMultiEdges(edges []*graph.MultiEdge) {
+ e.diag("PTransform Audit:\n")
+ for _, edge := range edges {
+ switch edge.Op {
+ case graph.ParDo:
+ // Gets the ParDo's identifier
+ e.diagf("pardo %s", edge.Name())
+ e.extractGraphFn((*graph.Fn)(edge.DoFn))
+ case graph.Combine:
+ e.diagf("combine %s", edge.Name())
+ e.extractGraphFn((*graph.Fn)(edge.CombineFn))
+ default:
+ continue
+ }
+ e.diag("\n")
+ }
+}
+
+// Performant returns whether this pipeline needs additional registrations
+// to avoid reflection, or symbol lookups at runtime.
+func (e *Eval) Performant() bool {
+ return !e.RequiresRegistrations() && !e.UsesDefaultReflectionShims()
+}
+
+// RequiresRegistrations returns if there are any types or functions that
require
+// registrations.
+func (e *Eval) RequiresRegistrations() bool {
+ return (len(e.functions) + len(e.types)) > 0
+}
+
+// UsesDefaultReflectionShims returns whether the default reflection shims are
going
+// to be used by the pipeline.
+func (e *Eval) UsesDefaultReflectionShims() bool {
+ return (len(e.funcs) + len(e.emits) + len(e.iters)) > 0
+}
+
+// AllExported returns whether all values in the pipeline are exported,
+// and thus it may be possible to patch the pipeline's package with
+// generated shims.
+// Using exported vs unexported identifiers does not affect pipeline
performance
+// but does matter on if the pipeline package can do anything about it.
+func (e *Eval) AllExported() bool {
+ return e.allExported
+}
+
+func (e *Eval) summary() {
+ e.diag("\n")
+ e.diag("Summary\n")
+ e.diagf("All exported?: %v\n", e.AllExported())
+ e.diagf("%d\t Imports\n", len(e.imports))
+ e.diagf("%d\t Functions\n", len(e.functions))
+ e.diagf("%d\t Types\n", len(e.types))
+ e.diagf("%d\t Shims\n", len(e.funcs))
+ e.diagf("%d\t Emits\n", len(e.emits))
+ e.diagf("%d\t Inputs\n", len(e.iters))
+
+ if e.Performant() {
+ e.diag("Pipeline is performant!\n")
+ } else {
+ e.diag("Pipeline is not performant:\n")
+ if e.RequiresRegistrations() {
+ e.diag("\trequires additional type or function
registration\n")
+ }
+ if e.UsesDefaultReflectionShims() {
+ e.diag("\trequires additional shim generation\n")
+ }
+ if e.AllExported() {
+ e.diag("\tGood News! All identifiers are exported; the
pipeline's package can be patched with generated output.\n")
+ }
+ }
+}
+
+// NameType turns a reflect.Type into a string based on it's name.
+// It prefixes Emit or Iter if the function satisfies the constraints of those
types.
+func NameType(t reflect.Type) string {
+ if emt, ok := makeEmitter(t); ok {
+ return "Emit" + emt.Name
+ }
+ if ipt, ok := makeInput(t); ok {
+ return "Iter" + ipt.Name
+ }
+ return shimx.Name(t.String())
+}
+
+// Generate produces a go file under the given package.
+func (e *Eval) Generate(packageName string) {
+ // Here's where we shove everything into the Top template type.
+ // Need to swap in typex.* for beam.* where appropriate.
+ e.diag("Diagnostic output pre-amble for the code generator\n")
+
+ e.diag("Functions\n")
+ var functions []string
+ for fn, t := range e.functions {
+ e.diagf("%s, %v\n", fn, t)
+ n := strings.Split(fn, ".")
+ // If this is the main package, we don't need the package
qualifier
+ if n[0] == "main" {
+ functions = append(functions, n[1])
+ } else {
+ functions = append(functions, fn)
+ }
+ }
+ e.diag("Types\n")
+ var types []string
+ for fn, t := range e.types {
+ e.diagf("%s, %v\n", fn, t)
+ n := strings.Split(fn, ".")
+ // If this is the main package, we don't need the package
qualifier
+ if n[0] == "main" {
+ types = append(types, n[1])
+ } else {
+ types = append(types, fn)
+ }
+ }
+ e.diag("Shims\n")
+ var shims []shimx.Func
+ for fn, t := range e.funcs {
+ e.diagf("%s, %v\n", fn, t)
+ shim := shimx.Func{Type: t.String()}
+ var inNames []string
+ for i := 0; i < t.NumIn(); i++ {
+ s := t.In(i)
+ shim.In = append(shim.In, s.String())
+ inNames = append(inNames, NameType(s))
+ }
+ var outNames []string
+ for i := 0; i < t.NumOut(); i++ {
+ s := t.Out(i)
+ shim.Out = append(shim.Out, s.String())
+ outNames = append(outNames, NameType(s))
+ }
+ shim.Name = shimx.FuncName(inNames, outNames)
+ shims = append(shims, shim)
+ }
+ e.diag("Emitters\n")
+ var emitters []shimx.Emitter
+ for k, t := range e.emits {
+ e.diagf("%s, %v\n", k, t)
+ emt, ok := makeEmitter(t)
+ if !ok {
+ panic(fmt.Sprintf("%v is not an emit, but we expected
it to be one.", t))
+ }
+ emitters = append(emitters, emt)
+ }
+ e.diag("Iterators \n")
+ var inputs []shimx.Input
+ for ipt, t := range e.iters {
+ e.diagf("%s, %v\n", ipt, t)
+ itr, ok := makeInput(t)
+ if !ok {
+ panic(fmt.Sprintf("%v is not an emit, but we expected
it to be one.", t))
+ }
+ inputs = append(inputs, itr)
+ }
+ var imports []string
+ for k := range e.imports {
+ if k == "" {
+ continue
+ }
+ imports = append(imports, k)
+ }
+
+ top := shimx.Top{
+ Package: packageName,
+ Imports: imports,
+ Functions: functions,
+ Types: types,
+ Shims: shims,
+ Emitters: emitters,
+ Inputs: inputs,
+ }
+ shimx.File(&e.w, &top)
+}
+
+func makeEmitter(t reflect.Type) (shimx.Emitter, bool) {
+ types, isEmit := funcx.UnfoldEmit(t)
+ if !isEmit {
+ return shimx.Emitter{}, false
+ }
+ emt := shimx.Emitter{Type: t.String()}
+ switch len(types) {
+ case 1:
+ emt.Time = false
+ emt.Val = types[0].String()
+ case 2:
+ if types[0] == typex.EventTimeType {
+ emt.Time = true
+ } else {
+ emt.Key = types[0].String()
+ }
+ emt.Val = types[1].String()
+ case 3:
+ // If there's 3, the first one must be typex.EvalentTime.
+ emt.Time = true
+ emt.Key = types[1].String()
+ emt.Val = types[2].String()
+ }
+ if emt.Time {
+ emt.Name = fmt.Sprintf("ET%s%s", shimx.Name(emt.Key),
shimx.Name(emt.Val))
+ } else {
+ emt.Name = fmt.Sprintf("%s%s", shimx.Name(emt.Key),
shimx.Name(emt.Val))
+ }
+ return emt, true
+}
+
+func makeInput(t reflect.Type) (shimx.Input, bool) {
+ itr := shimx.Input{Type: t.String()}
+ types, isIter := funcx.UnfoldIter(t)
+ if !isIter {
+ return shimx.Input{}, false
+ }
+ switch len(types) {
+ case 1:
+ itr.Time = false
+ itr.Val = types[0].String()
+ case 2:
+ if types[0] == typex.EventTimeType {
+ itr.Time = true
+ } else {
+ itr.Key = types[0].String()
+ }
+ itr.Val = types[1].String()
+ case 3:
+ // If there's 3, the first one must be typex.EventTime.
+ itr.Time = true
+ itr.Key = types[1].String()
+ itr.Val = types[2].String()
+ }
+ if itr.Time {
+ itr.Name = fmt.Sprintf("ET%s%s", shimx.Name(itr.Key),
shimx.Name(itr.Val))
+ } else {
+ itr.Name = fmt.Sprintf("%s%s", shimx.Name(itr.Key),
shimx.Name(itr.Val))
+ }
+ return itr, true
+}
+
+// needFunction marks the function itself needs to be registered
+func (e *Eval) needFunction(fn *funcx.Fn) {
+ k := fn.Fn.Name()
+ if _, ok := e.functions[k]; ok {
+ e.diag(" FUNCTION_COVERED")
+ } else {
+ e.diag(" NEED_FUNCTION") // Needs a RegisterFunction
+ e.functions[k] = fn
+ e.needImport(fn.Fn.Name())
+ }
+}
+
+// needImport registers the given identifier's import for including in
generation.
+func (e *Eval) needImport(p string) {
+ // If this is a reflect.methodValueCall, this is covered by the type
+ // check already, so we don't need to do anything.
+ if p == "reflect.methodValueCall" {
+ return
+ }
+ // Split at last '.' to get full package name and identifier name.
+ splitInd := strings.LastIndexByte(p, '.')
+ pp := p[:splitInd]
+
+ // If it's ad-hoc, or in main we can't/won't import it.
+ if pp == "main" || pp == "" {
+ return
+ }
+
+ // Check if the identifier is exported
+ r, _ := utf8.DecodeRuneInString(p[splitInd+1:])
+ if !unicode.IsUpper(r) {
+ e.allExported = false
+ return
+ }
+ e.imports[pp] = struct{}{}
+ e.diagf("\n%s\n", pp)
+}
+
+// needShim marks the function's type signature as needing to be specialized.
+func (e *Eval) needShim(fn *funcx.Fn) {
+ k := fn.Fn.Type().String()
+ if _, ok := e.funcs[k]; ok {
+ e.diag(" SHIM_COVERED")
+ } else {
+ e.diag(" NEED_SHIM") // Needs a RegisterFunc
+ e.funcs[k] = fn.Fn.Type()
+ e.needImport(fn.Fn.Name())
+ }
+}
+
+// needType marks the struct's type signature as needing to be specialized.
+func (e *Eval) needType(k string, rt reflect.Type) {
+ if _, ok := e.types[k]; ok {
+ e.diag(" OK")
+ } else {
+ e.diag(" NEED_TYPE") // Needs a RegisterType
+ e.types[k] = rt
+ e.needImport(k)
+ }
+}
+
+// needEmit marks the emit parameter as needed specialization
+func (e *Eval) needEmit(rt reflect.Type) {
+ k := fmt.Sprintf("%v", rt)
+ if exec.IsEmitterRegistered(rt) {
+ e.diag(" OK")
+ return
+ }
+ if _, ok := e.emits[k]; ok {
+ e.diag(" EMIT_COVERED")
+ } else {
+ e.diagf(" NEED_EMIT[%v]", rt) // Needs a RegisterEmit
+ e.emits[k] = rt
+ }
+}
+
+// needInput marks the iterator parameter as needed specialization
+func (e *Eval) needInput(rt reflect.Type) {
+ k := fmt.Sprintf("%v", rt)
+ if exec.IsInputRegistered(rt) {
+ e.diag(" OK")
+ return
+ }
+ if _, ok := e.iters[k]; ok {
+ e.diag(" INPUT_COVERED")
+ } else {
+ e.diagf(" NEED_INPUT[%v]", rt) // Needs a RegisterInput
+ e.iters[k] = rt
+ }
+}
+
+// diag invokes fmt.Fprint on the diagnostic buffer.
+func (e *Eval) diag(s string) {
+ fmt.Fprint(&e.d, s)
+}
+
+// diag invokes fmt.Fprintf on the diagnostic buffer.
+func (e *Eval) diagf(f string, args ...interface{}) {
+ fmt.Fprintf(&e.d, f, args...)
+}
+
+// Print invokes fmt.Fprint on the Eval buffer.
+func (e *Eval) Print(s string) {
+ fmt.Fprint(&e.w, s)
+}
+
+// Printf invokes fmt.Fprintf on the Eval buffer.
+func (e *Eval) Printf(f string, args ...interface{}) {
+ fmt.Fprintf(&e.w, f, args...)
+}
+
+// Bytes returns the Eval buffer's bytes for file writing.
+func (e *Eval) Bytes() []byte {
+ return e.w.Bytes()
+}
+
+// We need to take graph.Fns (which can be created from interface{} from
graph.NewFn)
+// and convert them to all needed function caller signatures,
+// and emitters.
+//
+// The type assertion shim Funcs need to be registered with
reflectx.RegisterFunc
+// Emitters need to be registered with exec.RegisterEmitter
+// Iterators with exec.RegisterInput
+// The types need to be registered with beam.RegisterType
+// The user functions need to be registered with beam.RegisterFunction
+//
+// Registrations are all on the concrete element type, rather than the
+// pointer type.
+
+// extractGraphFn does the analysis of the function and determines what things
need generating.
+// A single line is used, unless it's a struct, at which point one line per
implemented method
+// is used.
+func (e *Eval) extractGraphFn(fn *graph.Fn) {
+ if fn.DynFn != nil {
+ // TODO(BEAM-7375) handle dynamics if necessary (probably not
since it's got general function handling)
+ e.diag(" dynamic function")
+ return
+ }
+ if fn.Recv != nil {
+ e.diagf(" struct[[%T]]", fn.Recv)
+
+ rt := reflectx.SkipPtr(reflect.TypeOf(fn.Recv)) // We need the
value not the pointer that's used.
+ if tk, ok := runtime.TypeKey(rt); ok {
+ if t, found := runtime.LookupType(tk); !found {
+ e.needType(tk, rt)
+ } else {
+ e.diagf(" FOUND %v", t) // Doesn't need a
RegisterType
+ }
+ } else {
+ e.diagf(" CANT REGISTER %v %v %v", rt, rt.PkgPath(),
rt.Name())
+ }
+ e.extractFromDoFn((*graph.DoFn)(fn))
+ e.extractFromCombineFn((*graph.CombineFn)(fn))
+ }
+
+ if fn.Fn != nil {
+ // This goes here since methods don't need registering. That's
handled by the type.
+ f := fn.Fn.Fn
+ if _, err := runtime.ResolveFunction(f.Name(), f.Type()); err
!= nil {
+ e.needFunction(fn.Fn) // Need a RegisterFunction
+ }
+ e.extractFuncxFn(fn.Fn)
+ }
+}
+
+type mthd struct {
+ m func() *funcx.Fn
+ name string
+}
+
+func (e *Eval) extractFromCombineFn(cmbfn *graph.CombineFn) {
+ methods := []mthd{
+ {cmbfn.SetupFn, "SetupFn"},
+ {cmbfn.CreateAccumulatorFn, "CreateAccumulatorFn"},
+ {cmbfn.AddInputFn, "AddInputFn"},
+ {cmbfn.MergeAccumulatorsFn, "MergeAccumulatorsFn"},
+ {cmbfn.ExtractOutputFn, "ExtractOutputFn"},
+ {cmbfn.CompactFn, "CompactFn"},
+ {cmbfn.TeardownFn, "TeardownFn"},
+ }
+ e.extractMethods(methods)
+}
+
+func (e *Eval) extractFromDoFn(dofn *graph.DoFn) {
+ methods := []mthd{
+ {dofn.SetupFn, "SetupFn"},
+ {dofn.StartBundleFn, "StartBundleFn"},
+ {dofn.ProcessElementFn, "ProcessElementFn"},
+ {dofn.FinishBundleFn, "FinishBundleFn"},
+ {dofn.TeardownFn, "TeardownFn"},
+ }
+ e.extractMethods(methods)
+}
+
+func (e *Eval) extractMethods(methods []mthd) {
+ for _, m := range methods {
+ if mfn := m.m(); mfn != nil {
+ e.diag("\n\t- ")
+ e.diag(m.name)
+ e.extractFuncxFn(mfn)
+ }
+ }
+}
+
+// extractFuncxFn writes everything to the same line marking things as
registered or not as needed.
+func (e *Eval) extractFuncxFn(fn *funcx.Fn) {
+ t := fn.Fn.Type()
+ e.diagf(" function[[%v]]", t)
+ // We don't have access to the maps directly, so we can sanity check if
we need
+ // a shim by checking against this type.
+ if shim := fmt.Sprintf("%T", fn.Fn); shim == "*reflectx.reflectFunc" {
+ e.needShim(fn) // Need a generated Shim and RegisterFunc
+ }
+ // Need to extract emitter types and iterator types for specialization.
+ // We're "stuck" always generating these all the time, since we
+ // can't tell what's already registered at this level.
+ for _, p := range fn.Param {
+ switch p.Kind {
+ case funcx.FnEmit:
+ e.needEmit(p.T) // Need a generated emitter and
RegisterEmitter
+ case funcx.FnIter:
+ e.needInput(p.T) // Need a generated iter and
RegisterInput
+ case funcx.FnReIter:
+ e.needInput(p.T) // ???? Might be unnecessary?
+ }
+ }
+}
diff --git a/sdks/go/pkg/beam/runners/vet/vet_test.go
b/sdks/go/pkg/beam/runners/vet/vet_test.go
new file mode 100644
index 0000000..dc7076a
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/vet/vet_test.go
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package vet
+
+import (
+ "context"
+ "github.com/apache/beam/sdks/go/pkg/beam/runners/vet/testpipeline"
+ "testing"
+
+ "github.com/apache/beam/sdks/go/pkg/beam"
+)
+
+func TestEvaluate(t *testing.T) {
+ tests := []struct {
+ name string
+ c func(beam.Scope)
+ perf, exp, ref, reg bool
+ }{
+ {name: "Performant", c: testpipeline.Performant, perf: true},
+ {name: "FunctionReg", c: testpipeline.FunctionReg, exp: true,
ref: true, reg: true},
+ {name: "ShimNeeded", c: testpipeline.ShimNeeded, ref: true},
+ {name: "TypeReg", c: testpipeline.TypeReg, ref: true, reg:
true},
+ }
+ for _, test := range tests {
+ test := test
+ t.Run(test.name, func(t *testing.T) {
+ p, s := beam.NewPipelineWithRoot()
+ test.c(s)
+ e, err := Evaluate(context.Background(), p)
+ if err != nil {
+ t.Fatalf("failed to evaluate
testpipeline.Pipeline: %v", err)
+ }
+ if e.Performant() != test.perf {
+ t.Fatalf("e.Performant() = %v, want %v",
e.Performant(), test.perf)
+ }
+ // Abort early for performant pipelines.
+ if test.perf {
+ return
+ }
+ e.summary()
+ if e.AllExported() != test.exp {
+ t.Errorf("e.AllExported() = %v, want %v",
e.AllExported(), test.exp)
+ }
+ if e.RequiresRegistrations() != test.reg {
+ t.Errorf("e.RequiresRegistrations() = %v, want
%v\n%v", e.RequiresRegistrations(), test.reg, string(e.d.Bytes()))
+ }
+ if e.UsesDefaultReflectionShims() != test.ref {
+ t.Errorf("e.UsesDefaultReflectionShims() = %v,
want %v", e.UsesDefaultReflectionShims(), test.ref)
+ }
+ })
+ }
+}