lostluck commented on a change in pull request #12903:
URL: https://github.com/apache/beam/pull/12903#discussion_r495104599



##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+       "reflect"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+       f, ts, err := registerDoFn(dofn)
+       if err != nil {
+               panic(err)
+       }
+       if f != nil {
+               runtime.RegisterFunction(f)
+       }
+       for _, t := range ts {
+               runtime.RegisterType(t)
+       }
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+       if rt, ok := dofn.(reflect.Type); ok {
+               if rt.Kind() == reflect.Ptr {
+                       rt = rt.Elem()
+               }
+               dofn = reflect.New(rt).Interface()
+       }
+       fn, err := graph.NewFn(dofn)
+       if err != nil {
+               return nil, nil, err
+       }
+       c := cache{}
+       var valid bool
+       // Validates that this is a DoFn or combineFn.
+       do, err := graph.AsDoFn(fn, graph.MainUnknown)
+       if err == nil {
+               valid = true
+               handleDoFn(do, c)
+       } else if cmb, err2 := graph.AsCombineFn(fn); err2 == nil {
+               valid = true
+               handleCombineFn(cmb, c)
+       }
+       if !valid {
+               // Return the DoFn specific error as that's more common.
+               return nil, nil, err
+       }
+
+       var retFunc interface{}
+       rt := reflect.TypeOf(dofn)

Review comment:
       No, that rt is out of scope and it's only present if the user passed in 
a reflect.Type instance. It's scoped to the if statement. It's also nearly 30 
lines up, which hampers readability a bit by having an extended distance 
between declaration and use. This is adjacent to the uses, and that if block at 
the start ensures that there's only one way to interpret the rt variable here. 
It's the type of the dofn.
   
   Semantically, this is identical to calling all error values err. I'm always 
calling arbitrary reflect.Type instances rt.

##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+       "reflect"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+       f, ts, err := registerDoFn(dofn)
+       if err != nil {
+               panic(err)
+       }
+       if f != nil {
+               runtime.RegisterFunction(f)
+       }
+       for _, t := range ts {
+               runtime.RegisterType(t)
+       }
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+       if rt, ok := dofn.(reflect.Type); ok {
+               if rt.Kind() == reflect.Ptr {
+                       rt = rt.Elem()
+               }
+               dofn = reflect.New(rt).Interface()

Review comment:
       The graph.NewFn validator requires an actual instance of the DoFn, since 
graph.NewFn was designed for pipeline construction. This line creates such an 
instance, if what is passed in is reflect.Type.

##########
File path: sdks/go/pkg/beam/core/runtime/symbols.go
##########
@@ -72,7 +72,7 @@ type SymbolResolver interface {
 // RegisterFunction allows function registration. It is beneficial for 
performance
 // and is needed for functions -- such as custom coders -- serialized during 
unit
 // tests, where the underlying symbol table is not available. It should be 
called
-// in init() only. Returns the external key for the function.
+// in `init()` only.

Review comment:
       Great catch! It should be removed from forward.go! I removed it since 
this function doesn't return anything ever, so it was incorrect documentation.

##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+       "reflect"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+       f, ts, err := registerDoFn(dofn)
+       if err != nil {
+               panic(err)
+       }
+       if f != nil {
+               runtime.RegisterFunction(f)
+       }
+       for _, t := range ts {
+               runtime.RegisterType(t)
+       }
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+       if rt, ok := dofn.(reflect.Type); ok {
+               if rt.Kind() == reflect.Ptr {
+                       rt = rt.Elem()
+               }
+               dofn = reflect.New(rt).Interface()
+       }
+       fn, err := graph.NewFn(dofn)
+       if err != nil {
+               return nil, nil, err
+       }
+       c := cache{}
+       var valid bool
+       // Validates that this is a DoFn or combineFn.
+       do, err := graph.AsDoFn(fn, graph.MainUnknown)
+       if err == nil {
+               valid = true
+               handleDoFn(do, c)
+       } else if cmb, err2 := graph.AsCombineFn(fn); err2 == nil {
+               valid = true
+               handleCombineFn(cmb, c)
+       }
+       if !valid {
+               // Return the DoFn specific error as that's more common.
+               return nil, nil, err
+       }
+
+       var retFunc interface{}
+       rt := reflect.TypeOf(dofn)
+       switch rt.Kind() {
+       case reflect.Func:
+               retFunc = dofn
+               c.regFuncTypes(rt)
+       default:
+               c.regType(rt)
+       }
+       var retTypes []reflect.Type
+       for _, t := range c {
+               retTypes = append(retTypes, t)
+       }
+       return retFunc, retTypes, nil
+}
+
+func handleDoFn(fn *graph.DoFn, c cache) {
+       c.pullMethod(fn.SetupFn())
+       c.pullMethod(fn.StartBundleFn())
+       c.pullMethod(fn.ProcessElementFn())
+       c.pullMethod(fn.FinishBundleFn())
+       c.pullMethod(fn.TeardownFn())
+       if !fn.IsSplittable() {
+               return
+       }
+       sdf := (*graph.SplittableDoFn)(fn)
+       c.pullMethod(sdf.CreateInitialRestrictionFn())
+       c.pullMethod(sdf.CreateTrackerFn())

Review comment:
       RTs would be registered anyway since they show up in the ProcessElement 
signature. It's OK for extra types to be registered, the framework just won't 
automatically synthesize them like it needs to for other elements.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to