youngoli commented on a change in pull request #14192:
URL: https://github.com/apache/beam/pull/14192#discussion_r593553104
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##########
@@ -280,82 +301,194 @@ func (r *Registry) FromType(ot reflect.Type)
(*pipepb.Schema, error) {
return r.fromType(ot)
}
+func (r *Registry) logicalTypeToFieldType(t reflect.Type) (*pipepb.FieldType,
string, error) {
+ // Check if a logical type was registered that matches this struct type
directly
+ // and if so, extract the schema from it for use.
+ if lID, ok := r.logicalTypeIdentifiers[t]; ok {
+ lt := r.logicalTypes[lID]
+ ftype, err := r.reflectTypeToFieldType(lt.StorageType())
+ if err != nil {
+ return nil, "", errors.Wrapf(err, "unable to convert
LogicalType[%v]'s storage type %v for Go type of %v to a schema", lID,
lt.StorageType(), lt.GoType())
+ }
+ return ftype, lID, nil
+ }
+ for _, lti := range r.logicalTypeInterfaces {
+ if !t.Implements(lti) {
+ continue
+ }
+ p := r.logicalTypeProviders[lti]
+ st, err := p(t)
+ if err != nil {
+ return nil, "", errors.Wrapf(err, "unable to convert
LogicalType[%v] using provider for %v schema field", t, lti)
+ }
+ if st == nil {
+ continue
+ }
+ ftype, err := r.reflectTypeToFieldType(st)
+ if err != nil {
+ return nil, "", errors.Wrapf(err, "unable to convert
LogicalType[%v]'s storage type %v for Go type of %v to a schema", "interface",
st, t)
+ }
+ return ftype, t.String(), nil
+ }
+ return nil, "", nil
+}
+
func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) {
- if reflectx.SkipPtr(ot).Kind() != reflect.Struct {
- return nil, errors.Errorf("cannot convert %v to schema.
FromType only converts structs to schemas", ot)
+ if schm, ok := r.typeToSchema[ot]; ok {
+ return schm, nil
}
- schm, err := r.structToSchema(ot)
+ ftype, lID, err := r.logicalTypeToFieldType(ot)
if err != nil {
return nil, err
}
- if ot.Kind() == reflect.Ptr {
- schm.Options = append(schm.Options, &pipepb.Option{
- Name: optGoNillable,
- })
+ if ftype != nil {
Review comment:
I notice the beginning of `structToSchema` is very similar to this
section of `fromType`, and it gets called right after this section anyway. Is
there a reason this needs to be here instead of just calling `structToSchema`
right away? (If so it might be good to leave a comment explaining it)
##########
File path: sdks/go/pkg/beam/core/graph/coder/row_decoder.go
##########
@@ -114,16 +114,38 @@ func (b *RowDecoderBuilder) decoderForType(t
reflect.Type) (func(io.Reader) (int
// decoderForStructReflect returns a reflection based decoder function for the
// given struct type.
func (b *RowDecoderBuilder) decoderForStructReflect(t reflect.Type)
(func(reflect.Value, io.Reader) error, error) {
-
var coder typeDecoderReflect
coder.typ = t
for i := 0; i < t.NumField(); i++ {
i := i // avoid alias issues in the closures.
sf := t.Field(i)
isUnexported := sf.PkgPath != ""
- if isUnexported {
+ if sf.Anonymous {
+ ft := sf.Type
+ if ft.Kind() == reflect.Ptr {
+ // If a struct embeds a pointer to an
unexported type,
+ // it is not possible to set a newly allocated
value
+ // since the field is unexported.
+ //
+ // See https://golang.org/issue/21357
+ //
+ // Since the values are created by this package
reflectively,
+ // there's no work around like pre-allocating
the field
+ // manually.
+ if isUnexported {
Review comment:
Does `isUnexported` describe whether the field itself is unexported? Or
whether the type of the field is unexported? I thought it was the first, but
based on this line and the comment you added at line 144, it's actually the
second one. Is that right?
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##########
@@ -280,82 +301,194 @@ func (r *Registry) FromType(ot reflect.Type)
(*pipepb.Schema, error) {
return r.fromType(ot)
}
+func (r *Registry) logicalTypeToFieldType(t reflect.Type) (*pipepb.FieldType,
string, error) {
+ // Check if a logical type was registered that matches this struct type
directly
+ // and if so, extract the schema from it for use.
+ if lID, ok := r.logicalTypeIdentifiers[t]; ok {
+ lt := r.logicalTypes[lID]
+ ftype, err := r.reflectTypeToFieldType(lt.StorageType())
+ if err != nil {
+ return nil, "", errors.Wrapf(err, "unable to convert
LogicalType[%v]'s storage type %v for Go type of %v to a schema", lID,
lt.StorageType(), lt.GoType())
+ }
+ return ftype, lID, nil
+ }
+ for _, lti := range r.logicalTypeInterfaces {
+ if !t.Implements(lti) {
+ continue
+ }
+ p := r.logicalTypeProviders[lti]
+ st, err := p(t)
+ if err != nil {
+ return nil, "", errors.Wrapf(err, "unable to convert
LogicalType[%v] using provider for %v schema field", t, lti)
+ }
+ if st == nil {
+ continue
+ }
+ ftype, err := r.reflectTypeToFieldType(st)
+ if err != nil {
+ return nil, "", errors.Wrapf(err, "unable to convert
LogicalType[%v]'s storage type %v for Go type of %v to a schema", "interface",
st, t)
+ }
+ return ftype, t.String(), nil
+ }
+ return nil, "", nil
+}
+
func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) {
- if reflectx.SkipPtr(ot).Kind() != reflect.Struct {
- return nil, errors.Errorf("cannot convert %v to schema.
FromType only converts structs to schemas", ot)
+ if schm, ok := r.typeToSchema[ot]; ok {
+ return schm, nil
}
- schm, err := r.structToSchema(ot)
+ ftype, lID, err := r.logicalTypeToFieldType(ot)
if err != nil {
return nil, err
}
- if ot.Kind() == reflect.Ptr {
- schm.Options = append(schm.Options, &pipepb.Option{
- Name: optGoNillable,
- })
+ if ftype != nil {
+ schm := ftype.GetRowType().GetSchema()
+ schm = proto.Clone(schm).(*pipepb.Schema)
+ if ot.Kind() == reflect.Ptr {
+ schm.Options = append(schm.Options, &pipepb.Option{
+ Name: optGoNillable,
+ })
+ }
+ if lID != "" {
+ schm.Options = append(schm.Options, logicalOption(lID))
+ }
+ schm.Id = r.getNextID()
+ r.typeToSchema[ot] = schm
+ r.idToType[schm.GetId()] = ot
+ return schm, nil
}
- return schm, nil
+
+ t := reflectx.SkipPtr(ot)
+
+ schm, err := r.structToSchema(t)
+ if err != nil {
+ return nil, err
+ }
+ // Cache the pointer type here with it's own id.
+ pt := reflect.PtrTo(t)
+ schm = proto.Clone(schm).(*pipepb.Schema)
+ schm.Id = r.getNextID()
+ schm.Options = append(schm.Options, &pipepb.Option{
+ Name: optGoNillable,
+ })
+ r.idToType[schm.GetId()] = pt
+ r.typeToSchema[pt] = schm
+
+ // Return whatever the original type was.
+ return r.typeToSchema[ot], nil
Review comment:
After reading a little further down here, I think I might be missing
some implicit assumption here. Is everything after line 360 assuming that if
the type wasn't found above in either `typeToSchema` or
`logicalTypeToFieldType` then it must be a pointer type? Because we return the
original type, but it wouldn't be present in that map unless `ot` and `pt`
matched, right?
Edit: Or maybe this works here because we called `structToSchema` on the
non-pointer version, and then also added the pointer version, so one or the
other is guaranteed to be present? That seems worth adding a comment too. Like:
```
// Both pointer and non-pointer variants of the original type are
registered, so we can return whatever the original type was.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]