youngoli commented on a change in pull request #12903: URL: https://github.com/apache/beam/pull/12903#discussion_r494697535
########## File path: sdks/go/pkg/beam/core/runtime/genx/genx.go ########## @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package genx is a convenience package to better support the code +// generator. It can be depended on by the user facing beam package +// and be refered to by generated code. +// +// Similarly, it can depend on beam internals and access the canonical +// method list in the graph package, or other packages to filter out +// types that aren't necessary for registration (like context.Context). +package genx + +import ( + "reflect" + + "github.com/apache/beam/sdks/go/pkg/beam/core/funcx" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" +) + +// RegisterDoFn is a convenience function for registering DoFns. +// Differs from RegisterFunction and RegisterType by introspecting +// all parameters and returns of Lifecycle methods on the dofn, +// and registers those types for you. +// +// Panics if not passed a dofn. +func RegisterDoFn(dofn interface{}) { + f, ts, err := registerDoFn(dofn) + if err != nil { + panic(err) + } + if f != nil { + runtime.RegisterFunction(f) + } + for _, t := range ts { + runtime.RegisterType(t) + } +} + +// registerDoFn returns all types associated with the provided DoFn. +// If passed a functional DoFn, the first return is a Function to +// register with runtime.RegisterFunction. +// The second return is all types to register with runtime.RegisterType. +// Returns an error if the passed in values are not DoFns. +func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) { + if rt, ok := dofn.(reflect.Type); ok { + if rt.Kind() == reflect.Ptr { + rt = rt.Elem() + } + dofn = reflect.New(rt).Interface() + } + fn, err := graph.NewFn(dofn) + if err != nil { + return nil, nil, err + } + c := cache{} + var valid bool + // Validates that this is a DoFn or combineFn. + do, err := graph.AsDoFn(fn, graph.MainUnknown) + if err == nil { + valid = true + handleDoFn(do, c) + } else if cmb, err2 := graph.AsCombineFn(fn); err2 == nil { + valid = true + handleCombineFn(cmb, c) + } + if !valid { + // Return the DoFn specific error as that's more common. + return nil, nil, err + } + + var retFunc interface{} + rt := reflect.TypeOf(dofn) Review comment: Could this just use the `rt` from near the top of the function? It's a bit confusing right now with it named the same as a different scoped variable in the same function. ########## File path: sdks/go/pkg/beam/core/runtime/genx/genx.go ########## @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package genx is a convenience package to better support the code +// generator. It can be depended on by the user facing beam package +// and be refered to by generated code. +// +// Similarly, it can depend on beam internals and access the canonical +// method list in the graph package, or other packages to filter out +// types that aren't necessary for registration (like context.Context). +package genx + +import ( + "reflect" + + "github.com/apache/beam/sdks/go/pkg/beam/core/funcx" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" +) + +// RegisterDoFn is a convenience function for registering DoFns. +// Differs from RegisterFunction and RegisterType by introspecting +// all parameters and returns of Lifecycle methods on the dofn, +// and registers those types for you. +// +// Panics if not passed a dofn. +func RegisterDoFn(dofn interface{}) { + f, ts, err := registerDoFn(dofn) + if err != nil { + panic(err) + } + if f != nil { + runtime.RegisterFunction(f) + } + for _, t := range ts { + runtime.RegisterType(t) + } +} + +// registerDoFn returns all types associated with the provided DoFn. +// If passed a functional DoFn, the first return is a Function to +// register with runtime.RegisterFunction. +// The second return is all types to register with runtime.RegisterType. +// Returns an error if the passed in values are not DoFns. +func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) { + if rt, ok := dofn.(reflect.Type); ok { + if rt.Kind() == reflect.Ptr { + rt = rt.Elem() + } + dofn = reflect.New(rt).Interface() Review comment: What's the reason for creating a new interface from a reflect type for dofn here? Is it a way to reset the dofn to it's default values by creating a new instance? ########## File path: sdks/go/pkg/beam/core/runtime/genx/genx.go ########## @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package genx is a convenience package to better support the code +// generator. It can be depended on by the user facing beam package +// and be refered to by generated code. +// +// Similarly, it can depend on beam internals and access the canonical +// method list in the graph package, or other packages to filter out +// types that aren't necessary for registration (like context.Context). +package genx + +import ( + "reflect" + + "github.com/apache/beam/sdks/go/pkg/beam/core/funcx" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" +) + +// RegisterDoFn is a convenience function for registering DoFns. +// Differs from RegisterFunction and RegisterType by introspecting +// all parameters and returns of Lifecycle methods on the dofn, +// and registers those types for you. +// +// Panics if not passed a dofn. +func RegisterDoFn(dofn interface{}) { + f, ts, err := registerDoFn(dofn) + if err != nil { + panic(err) + } + if f != nil { + runtime.RegisterFunction(f) + } + for _, t := range ts { + runtime.RegisterType(t) + } +} + +// registerDoFn returns all types associated with the provided DoFn. +// If passed a functional DoFn, the first return is a Function to +// register with runtime.RegisterFunction. +// The second return is all types to register with runtime.RegisterType. +// Returns an error if the passed in values are not DoFns. +func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) { + if rt, ok := dofn.(reflect.Type); ok { + if rt.Kind() == reflect.Ptr { + rt = rt.Elem() + } + dofn = reflect.New(rt).Interface() + } + fn, err := graph.NewFn(dofn) + if err != nil { + return nil, nil, err + } + c := cache{} + var valid bool + // Validates that this is a DoFn or combineFn. + do, err := graph.AsDoFn(fn, graph.MainUnknown) + if err == nil { + valid = true + handleDoFn(do, c) + } else if cmb, err2 := graph.AsCombineFn(fn); err2 == nil { + valid = true + handleCombineFn(cmb, c) + } + if !valid { + // Return the DoFn specific error as that's more common. + return nil, nil, err + } + + var retFunc interface{} + rt := reflect.TypeOf(dofn) + switch rt.Kind() { + case reflect.Func: + retFunc = dofn + c.regFuncTypes(rt) + default: + c.regType(rt) + } + var retTypes []reflect.Type + for _, t := range c { + retTypes = append(retTypes, t) + } + return retFunc, retTypes, nil +} + +func handleDoFn(fn *graph.DoFn, c cache) { + c.pullMethod(fn.SetupFn()) + c.pullMethod(fn.StartBundleFn()) + c.pullMethod(fn.ProcessElementFn()) + c.pullMethod(fn.FinishBundleFn()) + c.pullMethod(fn.TeardownFn()) + if !fn.IsSplittable() { + return + } + sdf := (*graph.SplittableDoFn)(fn) + c.pullMethod(sdf.CreateInitialRestrictionFn()) + c.pullMethod(sdf.CreateTrackerFn()) Review comment: Note that while restrictions need to be serializable, restriction trackers don't (and probably can't). Registering RTrackers is probably not a problem as long as nothing actually tries to use the registered type, but otherwise you can probably avoid registering RTrackers by removing this line pulling CreateTrackerFn. ########## File path: sdks/go/pkg/beam/core/runtime/symbols.go ########## @@ -72,7 +72,7 @@ type SymbolResolver interface { // RegisterFunction allows function registration. It is beneficial for performance // and is needed for functions -- such as custom coders -- serialized during unit // tests, where the underlying symbol table is not available. It should be called -// in init() only. Returns the external key for the function. +// in `init()` only. Review comment: Nitpick: Is removing the last sentence intentional? If so, should it also be removed in forward.go? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
