lostluck commented on a change in pull request #13940:
URL: https://github.com/apache/beam/pull/13940#discussion_r577233304
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##########
@@ -102,52 +159,104 @@ func (r *Registry) registerType(ut reflect.Type, seen
map[reflect.Type]struct{})
if lID, ok := r.logicalTypeIdentifiers[t]; ok {
lt := r.logicalTypes[lID]
r.addToMaps(lt.StorageType(), t)
- return
+ return nil
}
- for _, lti := range r.logicalTypeInterfaces {
- if !t.Implements(lti) {
- continue
- }
+
+ useProvider := func(t, lti reflect.Type) (bool, error) {
p := r.logicalTypeProviders[lti]
st, err := p(t)
if err != nil {
- panic(errors.Wrapf(err, "unable to convert
LogicalType[%v] using provider for %v", t, lti))
+ return false, errors.Wrapf(err, "unable to convert
LogicalType[%v] using provider for %v", t, lti)
}
if st == nil {
- continue
+ return false, nil
}
r.RegisterLogicalType(ToLogicalType(t.String(), t, st))
r.addToMaps(st, t)
- return
+ return true, nil
+ }
+
+ for _, lti := range r.logicalTypeInterfaces {
+ if t.Implements(lti) {
+ ok, err := useProvider(t, lti)
+ if err != nil {
+ return err
+ }
+ if ok {
+ return nil
+ }
+ }
+ if t := reflect.PtrTo(t); t.Implements(lti) {
+ ok, err := useProvider(t, lti)
+ if err != nil {
+ return err
+ }
+ if ok {
+ return nil
+ }
+ }
}
switch t.Kind() {
case reflect.Map:
- r.registerType(t.Key(), seen)
+ if err := r.registerType(t.Key(), seen); err != nil {
+ return err
+ }
fallthrough
case reflect.Array, reflect.Slice, reflect.Ptr:
- r.registerType(t.Elem(), seen)
- return
+ if err := r.registerType(t.Elem(), seen); err != nil {
+ return errors.Wrapf(err, "type is of kind %v", t.Kind())
+ }
+ return nil
+ case reflect.Interface, reflect.Func, reflect.Chan, reflect.Invalid,
reflect.UnsafePointer, reflect.Uintptr:
+ // Ignore these, as they can't be serialized.
+ return nil
+ case reflect.Complex64, reflect.Complex128:
+ // TODO(BEAM-9615): Support complex number types.
+ return nil
case reflect.Struct: // What we expect here.
default:
- return
+ rt := reflectKindToTypeMap[t.Kind()]
+ // It's only a logical type if it's not a built in primitive
type.
+ if t != rt {
+ st, ok := reflectKindToTypeMap[t.Kind()]
+ if !ok {
+ fmt.Printf("\n nil type to RegisterLogicalType
for t %v\n", t)
+ return nil
+ }
+ r.RegisterLogicalType(ToLogicalType(t.String(), t, st))
+ }
+ return nil
}
runtime.RegisterType(ut)
for i := 0; i < t.NumField(); i++ {
sf := ut.Field(i)
- r.registerType(sf.Type, seen)
+ isUnexported := sf.PkgPath != ""
+ if isUnexported {
+ // Schemas can't handle unexported fields at all.
+ continue
+ }
+ if err := r.registerType(sf.Type, seen); err != nil {
+ return errors.Wrapf(err, "registering type for field %v
in %v", sf.Name, ut)
+ }
+ if implements(sf.Type, sdfRtrackerType) {
Review comment:
Technically it happens with RegisterDoFn when RegisterSchema is
included. I'm still working out the bugs, and am going with the heavy handed
approach for now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]