zeroshade commented on code in PR #40496:
URL: https://github.com/apache/arrow/pull/40496#discussion_r1530599901


##########
go/arrow/util/protobuf_reflect.go:
##########
@@ -0,0 +1,447 @@
+package util

Review Comment:
   Needs apache license



##########
go/arrow/util/protobuf_reflect.go:
##########
@@ -0,0 +1,447 @@
+package util
+
+import (
+       "fmt"
+       "github.com/apache/arrow/go/v16/arrow"
+       "github.com/apache/arrow/go/v16/arrow/array"
+       "github.com/apache/arrow/go/v16/arrow/memory"
+       "github.com/huandu/xstrings"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/reflect/protoreflect"
+       "google.golang.org/protobuf/types/known/anypb"
+       "reflect"
+)
+
+type SchemaOptions struct {
+       exclusionPolicy    func(pfr ProtobufFieldReflection) bool
+       fieldNameFormatter func(str string) string
+}
+
+type ProtobufStructReflection struct {
+       descriptor protoreflect.MessageDescriptor
+       message    protoreflect.Message
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+type Option func(*ProtobufStructReflection)
+
+func NewProtobufStructReflection(msg proto.Message, options ...Option) 
*ProtobufStructReflection {

Review Comment:
   can we get a godoc documentation comment on this?



##########
go/arrow/util/protobuf_reflect.go:
##########
@@ -0,0 +1,447 @@
+package util
+
+import (
+       "fmt"
+       "github.com/apache/arrow/go/v16/arrow"
+       "github.com/apache/arrow/go/v16/arrow/array"
+       "github.com/apache/arrow/go/v16/arrow/memory"
+       "github.com/huandu/xstrings"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/reflect/protoreflect"
+       "google.golang.org/protobuf/types/known/anypb"
+       "reflect"
+)
+
+type SchemaOptions struct {
+       exclusionPolicy    func(pfr ProtobufFieldReflection) bool
+       fieldNameFormatter func(str string) string
+}
+
+type ProtobufStructReflection struct {
+       descriptor protoreflect.MessageDescriptor
+       message    protoreflect.Message
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+type Option func(*ProtobufStructReflection)
+
+func NewProtobufStructReflection(msg proto.Message, options ...Option) 
*ProtobufStructReflection {
+       v := reflect.ValueOf(msg)
+       for v.Kind() == reflect.Ptr {
+               v = v.Elem()
+       }
+       includeAll := func(pfr ProtobufFieldReflection) bool {
+               return false
+       }
+       noFormatting := func(str string) string {
+               return str
+       }
+       psr := &ProtobufStructReflection{
+               descriptor: msg.ProtoReflect().Descriptor(),
+               message:    msg.ProtoReflect(),
+               rValue:     v,
+               SchemaOptions: SchemaOptions{
+                       exclusionPolicy:    includeAll,
+                       fieldNameFormatter: noFormatting,
+               },
+       }
+
+       for _, opt := range options {
+               opt(psr)
+       }
+
+       return psr
+}
+
+func WithExclusionPolicy(ex func(pfr ProtobufFieldReflection) bool) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.exclusionPolicy = ex
+       }
+}
+
+func WithFieldNameFormatter(formatter func(str string) string) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.fieldNameFormatter = formatter
+       }
+}
+
+func (psr ProtobufStructReflection) unmarshallAny() ProtobufStructReflection {
+       if psr.descriptor.FullName() == "google.protobuf.Any" {
+               for psr.rValue.Type().Kind() == reflect.Ptr {
+                       psr.rValue = reflect.Indirect(psr.rValue)
+               }
+               fieldValueAsAny, _ := psr.rValue.Interface().(anypb.Any)
+               msg, _ := fieldValueAsAny.UnmarshalNew()
+
+               v := reflect.ValueOf(msg)
+               for v.Kind() == reflect.Ptr {
+                       v = reflect.Indirect(v)
+               }
+
+               return ProtobufStructReflection{
+                       descriptor:    msg.ProtoReflect().Descriptor(),
+                       message:       msg.ProtoReflect(),
+                       rValue:        v,
+                       SchemaOptions: psr.SchemaOptions,
+               }
+       } else {
+               return psr
+       }
+}
+
+func (psr ProtobufStructReflection) GetArrowFields() []arrow.Field {
+       var fields []arrow.Field
+
+       for pfr := range psr.generateStructFields() {
+               fields = append(fields, arrow.Field{
+                       Name:     
psr.fieldNameFormatter(string(pfr.descriptor.Name())),
+                       Type:     pfr.getDataType(),
+                       Nullable: true,
+               })
+       }
+
+       return fields
+}
+
+func (psr ProtobufStructReflection) GetSchema() *arrow.Schema {
+       return arrow.NewSchema(psr.GetArrowFields(), nil)
+}
+
+type ProtobufListReflection struct {
+       ProtobufFieldReflection
+}
+
+func (pfr ProtobufFieldReflection) AsList() ProtobufListReflection {
+       return ProtobufListReflection{pfr}
+}
+
+func (plr ProtobufListReflection) getDataType() arrow.DataType {
+       for li := range plr.generateListItems() {
+               return arrow.ListOf(li.getDataType())
+       }
+       return nil
+}
+
+func (pfr ProtobufFieldReflection) AsMap() ProtobufMapReflection {
+       return ProtobufMapReflection{pfr}
+}
+
+type ProtobufMapReflection struct {
+       ProtobufFieldReflection
+}
+
+func (pmr ProtobufMapReflection) getDataType() arrow.DataType {
+       for kvp := range pmr.generateKeyValuePairs() {
+               return kvp.getDataType()
+       }
+       return nil
+}
+
+type ProtobufMapKeyValuePairReflection struct {
+       k ProtobufFieldReflection
+       v ProtobufFieldReflection
+}
+
+func (pmr ProtobufMapKeyValuePairReflection) getDataType() arrow.DataType {
+       return arrow.MapOf(pmr.k.getDataType(), pmr.v.getDataType())
+}
+
+func (pmr ProtobufMapReflection) generateKeyValuePairs() chan 
ProtobufMapKeyValuePairReflection {
+       out := make(chan ProtobufMapKeyValuePairReflection)
+
+       go func() {
+               defer close(out)
+               for _, k := range pmr.rValue.MapKeys() {
+                       kvp := ProtobufMapKeyValuePairReflection{
+                               k: ProtobufFieldReflection{
+                                       descriptor:    pmr.descriptor.MapKey(),
+                                       prValue:       getMapKey(k),
+                                       rValue:        k,
+                                       SchemaOptions: pmr.SchemaOptions,
+                               },
+                               v: ProtobufFieldReflection{
+                                       descriptor:    
pmr.descriptor.MapValue(),
+                                       prValue:       
pmr.prValue.Map().Get(protoreflect.MapKey(getMapKey(k))),
+                                       rValue:        pmr.rValue.MapIndex(k),
+                                       SchemaOptions: pmr.SchemaOptions,
+                               },
+                       }
+                       out <- kvp
+               }
+       }()
+
+       return out
+}
+
+func (psr ProtobufStructReflection) generateStructFields() chan 
ProtobufFieldReflection {
+       out := make(chan ProtobufFieldReflection)
+
+       go func() {
+               defer close(out)
+               fds := psr.descriptor.Fields()
+               for i := 0; i < fds.Len(); i++ {
+                       pfr := psr.getFieldByName(string(fds.Get(i).Name()))
+                       if psr.exclusionPolicy(pfr) {
+                               continue
+                       }
+                       out <- pfr
+               }
+       }()
+
+       return out
+}
+
+func (pfr ProtobufFieldReflection) AsStruct() ProtobufStructReflection {
+       psr := ProtobufStructReflection{
+               descriptor:    pfr.descriptor.Message(),
+               message:       pfr.prValue.Message(),
+               rValue:        pfr.rValue,
+               SchemaOptions: pfr.SchemaOptions,
+       }
+       psr = psr.unmarshallAny()
+       return psr
+}
+
+func (psr ProtobufStructReflection) getDataType() arrow.DataType {
+       return arrow.StructOf(psr.GetArrowFields()...)
+}
+
+func (psr ProtobufStructReflection) getFieldByName(n string) 
ProtobufFieldReflection {
+       fd := psr.descriptor.Fields().ByTextName(xstrings.ToSnakeCase(n))
+       fv := psr.rValue
+       if fv.IsValid() {
+               if !fv.IsZero() {
+                       for fv.Kind() == reflect.Ptr || fv.Kind() == 
reflect.Interface {
+                               fv = fv.Elem()
+                       }
+                       if fd.ContainingOneof() != nil {
+                               n = string(fd.ContainingOneof().Name())
+                       }
+                       fv = fv.FieldByName(xstrings.ToCamelCase(n))
+                       for fv.Kind() == reflect.Ptr {
+                               fv = fv.Elem()
+                       }
+               }
+       }
+       return ProtobufFieldReflection{
+               fd,
+               psr.message.Get(fd),
+               fv,
+               psr.SchemaOptions,
+       }
+}
+
+type ProtobufFieldReflection struct {
+       descriptor protoreflect.FieldDescriptor
+       prValue    protoreflect.Value
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+func (pfr ProtobufFieldReflection) isStruct() bool {
+       return pfr.descriptor.Kind() == protoreflect.MessageKind && 
!pfr.descriptor.IsMap() && pfr.rValue.Kind() != reflect.Slice
+}
+
+func (pfr ProtobufFieldReflection) isMap() bool {
+       return pfr.descriptor.Kind() == protoreflect.MessageKind && 
pfr.descriptor.IsMap()
+}
+
+func (pfr ProtobufFieldReflection) isList() bool {
+       return pfr.descriptor.IsList() && pfr.rValue.Kind() == reflect.Slice
+}
+
+func (pfr ProtobufFieldReflection) getListLength() int {
+       return pfr.prValue.List().Len()
+}
+
+func (pfr ProtobufFieldReflection) getMapLength() int {
+       return pfr.prValue.Map().Len()
+}
+
+func (plr ProtobufListReflection) generateListItems() chan 
ProtobufFieldReflection {
+       out := make(chan ProtobufFieldReflection)
+
+       go func() {
+               defer close(out)
+               for i := 0; i < plr.prValue.List().Len(); i++ {
+                       out <- ProtobufFieldReflection{
+                               descriptor:    plr.descriptor,
+                               prValue:       plr.prValue.List().Get(i),
+                               rValue:        plr.rValue.Index(i),
+                               SchemaOptions: plr.SchemaOptions,
+                       }
+               }
+       }()
+
+       return out
+}
+
+func (pfr ProtobufFieldReflection) getDataType() arrow.DataType {
+       var dt arrow.DataType
+
+       typeMap := map[protoreflect.Kind]arrow.DataType{
+               //Numeric
+               protoreflect.Int32Kind:    arrow.PrimitiveTypes.Int32,
+               protoreflect.Int64Kind:    arrow.PrimitiveTypes.Int64,
+               protoreflect.Sint32Kind:   arrow.PrimitiveTypes.Int32,
+               protoreflect.Sint64Kind:   arrow.PrimitiveTypes.Int64,
+               protoreflect.Uint32Kind:   arrow.PrimitiveTypes.Uint32,
+               protoreflect.Uint64Kind:   arrow.PrimitiveTypes.Uint64,
+               protoreflect.Fixed32Kind:  arrow.PrimitiveTypes.Uint32,
+               protoreflect.Fixed64Kind:  arrow.PrimitiveTypes.Uint64,
+               protoreflect.Sfixed32Kind: arrow.PrimitiveTypes.Int32,
+               protoreflect.Sfixed64Kind: arrow.PrimitiveTypes.Int64,
+               protoreflect.FloatKind:    arrow.PrimitiveTypes.Float32,
+               protoreflect.DoubleKind:   arrow.PrimitiveTypes.Float64,
+               //Binary
+               protoreflect.StringKind: arrow.BinaryTypes.String,
+               protoreflect.BytesKind:  arrow.BinaryTypes.Binary,
+               //Fixed Width
+               protoreflect.BoolKind: arrow.FixedWidthTypes.Boolean,
+               // Enum
+               protoreflect.EnumKind: arrow.PrimitiveTypes.Int32,

Review Comment:
   should an enum be a dictionary type using the strings as keys?



##########
go/arrow/util/protobuf_reflect.go:
##########
@@ -0,0 +1,447 @@
+package util
+
+import (
+       "fmt"
+       "github.com/apache/arrow/go/v16/arrow"
+       "github.com/apache/arrow/go/v16/arrow/array"
+       "github.com/apache/arrow/go/v16/arrow/memory"
+       "github.com/huandu/xstrings"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/reflect/protoreflect"
+       "google.golang.org/protobuf/types/known/anypb"
+       "reflect"
+)
+
+type SchemaOptions struct {
+       exclusionPolicy    func(pfr ProtobufFieldReflection) bool
+       fieldNameFormatter func(str string) string
+}
+
+type ProtobufStructReflection struct {
+       descriptor protoreflect.MessageDescriptor
+       message    protoreflect.Message
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+type Option func(*ProtobufStructReflection)
+
+func NewProtobufStructReflection(msg proto.Message, options ...Option) 
*ProtobufStructReflection {
+       v := reflect.ValueOf(msg)
+       for v.Kind() == reflect.Ptr {
+               v = v.Elem()
+       }
+       includeAll := func(pfr ProtobufFieldReflection) bool {
+               return false
+       }
+       noFormatting := func(str string) string {
+               return str
+       }
+       psr := &ProtobufStructReflection{
+               descriptor: msg.ProtoReflect().Descriptor(),
+               message:    msg.ProtoReflect(),
+               rValue:     v,
+               SchemaOptions: SchemaOptions{
+                       exclusionPolicy:    includeAll,
+                       fieldNameFormatter: noFormatting,
+               },
+       }
+
+       for _, opt := range options {
+               opt(psr)
+       }
+
+       return psr
+}
+
+func WithExclusionPolicy(ex func(pfr ProtobufFieldReflection) bool) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.exclusionPolicy = ex
+       }
+}
+
+func WithFieldNameFormatter(formatter func(str string) string) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.fieldNameFormatter = formatter
+       }
+}
+
+func (psr ProtobufStructReflection) unmarshallAny() ProtobufStructReflection {
+       if psr.descriptor.FullName() == "google.protobuf.Any" {
+               for psr.rValue.Type().Kind() == reflect.Ptr {
+                       psr.rValue = reflect.Indirect(psr.rValue)
+               }
+               fieldValueAsAny, _ := psr.rValue.Interface().(anypb.Any)
+               msg, _ := fieldValueAsAny.UnmarshalNew()
+
+               v := reflect.ValueOf(msg)
+               for v.Kind() == reflect.Ptr {
+                       v = reflect.Indirect(v)
+               }
+
+               return ProtobufStructReflection{
+                       descriptor:    msg.ProtoReflect().Descriptor(),
+                       message:       msg.ProtoReflect(),
+                       rValue:        v,
+                       SchemaOptions: psr.SchemaOptions,
+               }
+       } else {
+               return psr
+       }
+}
+
+func (psr ProtobufStructReflection) GetArrowFields() []arrow.Field {
+       var fields []arrow.Field
+
+       for pfr := range psr.generateStructFields() {
+               fields = append(fields, arrow.Field{
+                       Name:     
psr.fieldNameFormatter(string(pfr.descriptor.Name())),
+                       Type:     pfr.getDataType(),
+                       Nullable: true,
+               })
+       }
+
+       return fields
+}
+
+func (psr ProtobufStructReflection) GetSchema() *arrow.Schema {
+       return arrow.NewSchema(psr.GetArrowFields(), nil)
+}
+
+type ProtobufListReflection struct {
+       ProtobufFieldReflection
+}
+
+func (pfr ProtobufFieldReflection) AsList() ProtobufListReflection {
+       return ProtobufListReflection{pfr}
+}
+
+func (plr ProtobufListReflection) getDataType() arrow.DataType {
+       for li := range plr.generateListItems() {
+               return arrow.ListOf(li.getDataType())
+       }
+       return nil
+}
+
+func (pfr ProtobufFieldReflection) AsMap() ProtobufMapReflection {
+       return ProtobufMapReflection{pfr}
+}
+
+type ProtobufMapReflection struct {
+       ProtobufFieldReflection
+}
+
+func (pmr ProtobufMapReflection) getDataType() arrow.DataType {
+       for kvp := range pmr.generateKeyValuePairs() {
+               return kvp.getDataType()
+       }
+       return nil
+}
+
+type ProtobufMapKeyValuePairReflection struct {
+       k ProtobufFieldReflection
+       v ProtobufFieldReflection
+}
+
+func (pmr ProtobufMapKeyValuePairReflection) getDataType() arrow.DataType {
+       return arrow.MapOf(pmr.k.getDataType(), pmr.v.getDataType())
+}
+
+func (pmr ProtobufMapReflection) generateKeyValuePairs() chan 
ProtobufMapKeyValuePairReflection {
+       out := make(chan ProtobufMapKeyValuePairReflection)
+
+       go func() {
+               defer close(out)
+               for _, k := range pmr.rValue.MapKeys() {
+                       kvp := ProtobufMapKeyValuePairReflection{
+                               k: ProtobufFieldReflection{
+                                       descriptor:    pmr.descriptor.MapKey(),
+                                       prValue:       getMapKey(k),
+                                       rValue:        k,
+                                       SchemaOptions: pmr.SchemaOptions,
+                               },
+                               v: ProtobufFieldReflection{
+                                       descriptor:    
pmr.descriptor.MapValue(),
+                                       prValue:       
pmr.prValue.Map().Get(protoreflect.MapKey(getMapKey(k))),
+                                       rValue:        pmr.rValue.MapIndex(k),
+                                       SchemaOptions: pmr.SchemaOptions,
+                               },
+                       }
+                       out <- kvp
+               }
+       }()
+
+       return out
+}
+
+func (psr ProtobufStructReflection) generateStructFields() chan 
ProtobufFieldReflection {
+       out := make(chan ProtobufFieldReflection)
+
+       go func() {
+               defer close(out)
+               fds := psr.descriptor.Fields()
+               for i := 0; i < fds.Len(); i++ {
+                       pfr := psr.getFieldByName(string(fds.Get(i).Name()))
+                       if psr.exclusionPolicy(pfr) {
+                               continue
+                       }
+                       out <- pfr
+               }
+       }()
+
+       return out
+}
+
+func (pfr ProtobufFieldReflection) AsStruct() ProtobufStructReflection {
+       psr := ProtobufStructReflection{
+               descriptor:    pfr.descriptor.Message(),
+               message:       pfr.prValue.Message(),
+               rValue:        pfr.rValue,
+               SchemaOptions: pfr.SchemaOptions,
+       }
+       psr = psr.unmarshallAny()
+       return psr
+}
+
+func (psr ProtobufStructReflection) getDataType() arrow.DataType {
+       return arrow.StructOf(psr.GetArrowFields()...)
+}
+
+func (psr ProtobufStructReflection) getFieldByName(n string) 
ProtobufFieldReflection {
+       fd := psr.descriptor.Fields().ByTextName(xstrings.ToSnakeCase(n))
+       fv := psr.rValue
+       if fv.IsValid() {
+               if !fv.IsZero() {
+                       for fv.Kind() == reflect.Ptr || fv.Kind() == 
reflect.Interface {
+                               fv = fv.Elem()
+                       }
+                       if fd.ContainingOneof() != nil {
+                               n = string(fd.ContainingOneof().Name())
+                       }
+                       fv = fv.FieldByName(xstrings.ToCamelCase(n))
+                       for fv.Kind() == reflect.Ptr {
+                               fv = fv.Elem()
+                       }
+               }
+       }
+       return ProtobufFieldReflection{
+               fd,
+               psr.message.Get(fd),
+               fv,
+               psr.SchemaOptions,
+       }
+}
+
+type ProtobufFieldReflection struct {
+       descriptor protoreflect.FieldDescriptor
+       prValue    protoreflect.Value
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+func (pfr ProtobufFieldReflection) isStruct() bool {
+       return pfr.descriptor.Kind() == protoreflect.MessageKind && 
!pfr.descriptor.IsMap() && pfr.rValue.Kind() != reflect.Slice
+}
+
+func (pfr ProtobufFieldReflection) isMap() bool {
+       return pfr.descriptor.Kind() == protoreflect.MessageKind && 
pfr.descriptor.IsMap()
+}
+
+func (pfr ProtobufFieldReflection) isList() bool {
+       return pfr.descriptor.IsList() && pfr.rValue.Kind() == reflect.Slice
+}
+
+func (pfr ProtobufFieldReflection) getListLength() int {
+       return pfr.prValue.List().Len()
+}
+
+func (pfr ProtobufFieldReflection) getMapLength() int {
+       return pfr.prValue.Map().Len()
+}
+
+func (plr ProtobufListReflection) generateListItems() chan 
ProtobufFieldReflection {
+       out := make(chan ProtobufFieldReflection)
+
+       go func() {
+               defer close(out)
+               for i := 0; i < plr.prValue.List().Len(); i++ {
+                       out <- ProtobufFieldReflection{
+                               descriptor:    plr.descriptor,
+                               prValue:       plr.prValue.List().Get(i),
+                               rValue:        plr.rValue.Index(i),
+                               SchemaOptions: plr.SchemaOptions,
+                       }
+               }
+       }()
+
+       return out
+}
+
+func (pfr ProtobufFieldReflection) getDataType() arrow.DataType {
+       var dt arrow.DataType
+
+       typeMap := map[protoreflect.Kind]arrow.DataType{
+               //Numeric
+               protoreflect.Int32Kind:    arrow.PrimitiveTypes.Int32,
+               protoreflect.Int64Kind:    arrow.PrimitiveTypes.Int64,
+               protoreflect.Sint32Kind:   arrow.PrimitiveTypes.Int32,
+               protoreflect.Sint64Kind:   arrow.PrimitiveTypes.Int64,
+               protoreflect.Uint32Kind:   arrow.PrimitiveTypes.Uint32,
+               protoreflect.Uint64Kind:   arrow.PrimitiveTypes.Uint64,
+               protoreflect.Fixed32Kind:  arrow.PrimitiveTypes.Uint32,
+               protoreflect.Fixed64Kind:  arrow.PrimitiveTypes.Uint64,
+               protoreflect.Sfixed32Kind: arrow.PrimitiveTypes.Int32,
+               protoreflect.Sfixed64Kind: arrow.PrimitiveTypes.Int64,
+               protoreflect.FloatKind:    arrow.PrimitiveTypes.Float32,
+               protoreflect.DoubleKind:   arrow.PrimitiveTypes.Float64,
+               //Binary
+               protoreflect.StringKind: arrow.BinaryTypes.String,
+               protoreflect.BytesKind:  arrow.BinaryTypes.Binary,
+               //Fixed Width
+               protoreflect.BoolKind: arrow.FixedWidthTypes.Boolean,
+               // Enum
+               protoreflect.EnumKind: arrow.PrimitiveTypes.Int32,
+               // Struct
+               protoreflect.MessageKind: nil,
+       }
+       dt = typeMap[pfr.descriptor.Kind()]
+
+       if pfr.isStruct() {
+               dt = pfr.AsStruct().getDataType()
+       }
+
+       if pfr.isMap() {
+               dt = pfr.AsMap().getDataType()
+       }
+
+       if pfr.isList() {
+               dt = pfr.AsList().getDataType()
+       }
+       return dt
+}
+
+func getBuilders(s *arrow.Schema, m memory.Allocator) []array.Builder {
+       var builders []array.Builder
+
+       for _, f := range s.Fields() {
+               builders = append(builders, array.NewBuilder(m, f.Type))
+       }
+       return builders
+}
+
+func RecordFromProtobuf(psr ProtobufStructReflection, schema *arrow.Schema) 
arrow.Record {
+       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+       bldrs := getBuilders(schema, mem)

Review Comment:
   This should take the allocator in as an argument and if it is nil, you 
should use `memory.DefaultAllocator`. 
   
   Also, `NewCheckedAllocator` is for use to confirm there's no leaks. If we 
aren't using it to verify the memory usage, then don't use it for this, it's 
typically used in tests.
   
   `getBuilders` can simply be replaced with NewRecordBuilder



##########
go/arrow/util/protobuf_reflect.go:
##########
@@ -0,0 +1,447 @@
+package util
+
+import (
+       "fmt"
+       "github.com/apache/arrow/go/v16/arrow"
+       "github.com/apache/arrow/go/v16/arrow/array"
+       "github.com/apache/arrow/go/v16/arrow/memory"
+       "github.com/huandu/xstrings"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/reflect/protoreflect"
+       "google.golang.org/protobuf/types/known/anypb"
+       "reflect"
+)
+
+type SchemaOptions struct {
+       exclusionPolicy    func(pfr ProtobufFieldReflection) bool
+       fieldNameFormatter func(str string) string
+}
+
+type ProtobufStructReflection struct {
+       descriptor protoreflect.MessageDescriptor
+       message    protoreflect.Message
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+type Option func(*ProtobufStructReflection)
+
+func NewProtobufStructReflection(msg proto.Message, options ...Option) 
*ProtobufStructReflection {
+       v := reflect.ValueOf(msg)
+       for v.Kind() == reflect.Ptr {
+               v = v.Elem()
+       }
+       includeAll := func(pfr ProtobufFieldReflection) bool {
+               return false
+       }
+       noFormatting := func(str string) string {
+               return str
+       }
+       psr := &ProtobufStructReflection{
+               descriptor: msg.ProtoReflect().Descriptor(),
+               message:    msg.ProtoReflect(),
+               rValue:     v,
+               SchemaOptions: SchemaOptions{
+                       exclusionPolicy:    includeAll,
+                       fieldNameFormatter: noFormatting,
+               },
+       }
+
+       for _, opt := range options {
+               opt(psr)
+       }
+
+       return psr
+}
+
+func WithExclusionPolicy(ex func(pfr ProtobufFieldReflection) bool) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.exclusionPolicy = ex
+       }
+}
+
+func WithFieldNameFormatter(formatter func(str string) string) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.fieldNameFormatter = formatter
+       }
+}

Review Comment:
   can we get documentation comments on these to explain what they are for?



##########
go/arrow/util/messages/types.proto:
##########
@@ -0,0 +1,40 @@
+syntax = "proto3";
+import "google/protobuf/any.proto";

Review Comment:
   Needs the Apache License



##########
go/arrow/util/protobuf_reflect_test.go:
##########
@@ -0,0 +1,140 @@
+package util

Review Comment:
   needs apache license



##########
go/arrow/util/protobuf_reflect.go:
##########
@@ -0,0 +1,447 @@
+package util
+
+import (
+       "fmt"
+       "github.com/apache/arrow/go/v16/arrow"
+       "github.com/apache/arrow/go/v16/arrow/array"
+       "github.com/apache/arrow/go/v16/arrow/memory"
+       "github.com/huandu/xstrings"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/reflect/protoreflect"
+       "google.golang.org/protobuf/types/known/anypb"
+       "reflect"
+)
+
+type SchemaOptions struct {
+       exclusionPolicy    func(pfr ProtobufFieldReflection) bool
+       fieldNameFormatter func(str string) string
+}
+
+type ProtobufStructReflection struct {
+       descriptor protoreflect.MessageDescriptor
+       message    protoreflect.Message
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+type Option func(*ProtobufStructReflection)
+
+func NewProtobufStructReflection(msg proto.Message, options ...Option) 
*ProtobufStructReflection {
+       v := reflect.ValueOf(msg)
+       for v.Kind() == reflect.Ptr {
+               v = v.Elem()
+       }
+       includeAll := func(pfr ProtobufFieldReflection) bool {
+               return false
+       }
+       noFormatting := func(str string) string {
+               return str
+       }
+       psr := &ProtobufStructReflection{
+               descriptor: msg.ProtoReflect().Descriptor(),
+               message:    msg.ProtoReflect(),
+               rValue:     v,
+               SchemaOptions: SchemaOptions{
+                       exclusionPolicy:    includeAll,
+                       fieldNameFormatter: noFormatting,
+               },
+       }
+
+       for _, opt := range options {
+               opt(psr)
+       }
+
+       return psr
+}
+
+func WithExclusionPolicy(ex func(pfr ProtobufFieldReflection) bool) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.exclusionPolicy = ex
+       }
+}
+
+func WithFieldNameFormatter(formatter func(str string) string) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.fieldNameFormatter = formatter
+       }
+}
+
+func (psr ProtobufStructReflection) unmarshallAny() ProtobufStructReflection {
+       if psr.descriptor.FullName() == "google.protobuf.Any" {
+               for psr.rValue.Type().Kind() == reflect.Ptr {
+                       psr.rValue = reflect.Indirect(psr.rValue)
+               }
+               fieldValueAsAny, _ := psr.rValue.Interface().(anypb.Any)
+               msg, _ := fieldValueAsAny.UnmarshalNew()
+
+               v := reflect.ValueOf(msg)
+               for v.Kind() == reflect.Ptr {
+                       v = reflect.Indirect(v)
+               }
+
+               return ProtobufStructReflection{
+                       descriptor:    msg.ProtoReflect().Descriptor(),
+                       message:       msg.ProtoReflect(),
+                       rValue:        v,
+                       SchemaOptions: psr.SchemaOptions,
+               }
+       } else {
+               return psr
+       }
+}
+
+func (psr ProtobufStructReflection) GetArrowFields() []arrow.Field {
+       var fields []arrow.Field
+
+       for pfr := range psr.generateStructFields() {
+               fields = append(fields, arrow.Field{
+                       Name:     
psr.fieldNameFormatter(string(pfr.descriptor.Name())),
+                       Type:     pfr.getDataType(),
+                       Nullable: true,
+               })
+       }
+
+       return fields
+}
+
+func (psr ProtobufStructReflection) GetSchema() *arrow.Schema {
+       return arrow.NewSchema(psr.GetArrowFields(), nil)
+}
+
+type ProtobufListReflection struct {
+       ProtobufFieldReflection
+}
+
+func (pfr ProtobufFieldReflection) AsList() ProtobufListReflection {
+       return ProtobufListReflection{pfr}
+}
+
+func (plr ProtobufListReflection) getDataType() arrow.DataType {
+       for li := range plr.generateListItems() {
+               return arrow.ListOf(li.getDataType())
+       }
+       return nil
+}
+
+func (pfr ProtobufFieldReflection) AsMap() ProtobufMapReflection {
+       return ProtobufMapReflection{pfr}
+}
+
+type ProtobufMapReflection struct {
+       ProtobufFieldReflection
+}
+
+func (pmr ProtobufMapReflection) getDataType() arrow.DataType {
+       for kvp := range pmr.generateKeyValuePairs() {
+               return kvp.getDataType()
+       }
+       return nil
+}
+
+type ProtobufMapKeyValuePairReflection struct {
+       k ProtobufFieldReflection
+       v ProtobufFieldReflection
+}
+
+func (pmr ProtobufMapKeyValuePairReflection) getDataType() arrow.DataType {
+       return arrow.MapOf(pmr.k.getDataType(), pmr.v.getDataType())
+}
+
+func (pmr ProtobufMapReflection) generateKeyValuePairs() chan 
ProtobufMapKeyValuePairReflection {
+       out := make(chan ProtobufMapKeyValuePairReflection)
+
+       go func() {
+               defer close(out)
+               for _, k := range pmr.rValue.MapKeys() {
+                       kvp := ProtobufMapKeyValuePairReflection{
+                               k: ProtobufFieldReflection{
+                                       descriptor:    pmr.descriptor.MapKey(),
+                                       prValue:       getMapKey(k),
+                                       rValue:        k,
+                                       SchemaOptions: pmr.SchemaOptions,
+                               },
+                               v: ProtobufFieldReflection{
+                                       descriptor:    
pmr.descriptor.MapValue(),
+                                       prValue:       
pmr.prValue.Map().Get(protoreflect.MapKey(getMapKey(k))),
+                                       rValue:        pmr.rValue.MapIndex(k),
+                                       SchemaOptions: pmr.SchemaOptions,
+                               },
+                       }
+                       out <- kvp
+               }
+       }()
+
+       return out
+}
+
+func (psr ProtobufStructReflection) generateStructFields() chan 
ProtobufFieldReflection {
+       out := make(chan ProtobufFieldReflection)
+
+       go func() {
+               defer close(out)
+               fds := psr.descriptor.Fields()
+               for i := 0; i < fds.Len(); i++ {
+                       pfr := psr.getFieldByName(string(fds.Get(i).Name()))
+                       if psr.exclusionPolicy(pfr) {
+                               continue
+                       }
+                       out <- pfr
+               }
+       }()
+
+       return out
+}
+
+func (pfr ProtobufFieldReflection) AsStruct() ProtobufStructReflection {
+       psr := ProtobufStructReflection{
+               descriptor:    pfr.descriptor.Message(),
+               message:       pfr.prValue.Message(),
+               rValue:        pfr.rValue,
+               SchemaOptions: pfr.SchemaOptions,
+       }
+       psr = psr.unmarshallAny()
+       return psr
+}
+
+func (psr ProtobufStructReflection) getDataType() arrow.DataType {
+       return arrow.StructOf(psr.GetArrowFields()...)
+}
+
+func (psr ProtobufStructReflection) getFieldByName(n string) 
ProtobufFieldReflection {
+       fd := psr.descriptor.Fields().ByTextName(xstrings.ToSnakeCase(n))
+       fv := psr.rValue
+       if fv.IsValid() {
+               if !fv.IsZero() {
+                       for fv.Kind() == reflect.Ptr || fv.Kind() == 
reflect.Interface {
+                               fv = fv.Elem()
+                       }
+                       if fd.ContainingOneof() != nil {
+                               n = string(fd.ContainingOneof().Name())
+                       }
+                       fv = fv.FieldByName(xstrings.ToCamelCase(n))
+                       for fv.Kind() == reflect.Ptr {
+                               fv = fv.Elem()
+                       }
+               }
+       }
+       return ProtobufFieldReflection{
+               fd,
+               psr.message.Get(fd),
+               fv,
+               psr.SchemaOptions,
+       }
+}
+
+type ProtobufFieldReflection struct {
+       descriptor protoreflect.FieldDescriptor
+       prValue    protoreflect.Value
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+func (pfr ProtobufFieldReflection) isStruct() bool {
+       return pfr.descriptor.Kind() == protoreflect.MessageKind && 
!pfr.descriptor.IsMap() && pfr.rValue.Kind() != reflect.Slice
+}
+
+func (pfr ProtobufFieldReflection) isMap() bool {
+       return pfr.descriptor.Kind() == protoreflect.MessageKind && 
pfr.descriptor.IsMap()
+}
+
+func (pfr ProtobufFieldReflection) isList() bool {
+       return pfr.descriptor.IsList() && pfr.rValue.Kind() == reflect.Slice
+}
+
+func (pfr ProtobufFieldReflection) getListLength() int {
+       return pfr.prValue.List().Len()
+}
+
+func (pfr ProtobufFieldReflection) getMapLength() int {
+       return pfr.prValue.Map().Len()
+}
+
+func (plr ProtobufListReflection) generateListItems() chan 
ProtobufFieldReflection {
+       out := make(chan ProtobufFieldReflection)
+
+       go func() {
+               defer close(out)
+               for i := 0; i < plr.prValue.List().Len(); i++ {
+                       out <- ProtobufFieldReflection{
+                               descriptor:    plr.descriptor,
+                               prValue:       plr.prValue.List().Get(i),
+                               rValue:        plr.rValue.Index(i),
+                               SchemaOptions: plr.SchemaOptions,
+                       }
+               }
+       }()
+
+       return out
+}
+
+func (pfr ProtobufFieldReflection) getDataType() arrow.DataType {
+       var dt arrow.DataType
+
+       typeMap := map[protoreflect.Kind]arrow.DataType{
+               //Numeric
+               protoreflect.Int32Kind:    arrow.PrimitiveTypes.Int32,
+               protoreflect.Int64Kind:    arrow.PrimitiveTypes.Int64,
+               protoreflect.Sint32Kind:   arrow.PrimitiveTypes.Int32,
+               protoreflect.Sint64Kind:   arrow.PrimitiveTypes.Int64,
+               protoreflect.Uint32Kind:   arrow.PrimitiveTypes.Uint32,
+               protoreflect.Uint64Kind:   arrow.PrimitiveTypes.Uint64,
+               protoreflect.Fixed32Kind:  arrow.PrimitiveTypes.Uint32,
+               protoreflect.Fixed64Kind:  arrow.PrimitiveTypes.Uint64,
+               protoreflect.Sfixed32Kind: arrow.PrimitiveTypes.Int32,
+               protoreflect.Sfixed64Kind: arrow.PrimitiveTypes.Int64,
+               protoreflect.FloatKind:    arrow.PrimitiveTypes.Float32,
+               protoreflect.DoubleKind:   arrow.PrimitiveTypes.Float64,
+               //Binary
+               protoreflect.StringKind: arrow.BinaryTypes.String,
+               protoreflect.BytesKind:  arrow.BinaryTypes.Binary,
+               //Fixed Width
+               protoreflect.BoolKind: arrow.FixedWidthTypes.Boolean,
+               // Enum
+               protoreflect.EnumKind: arrow.PrimitiveTypes.Int32,
+               // Struct
+               protoreflect.MessageKind: nil,
+       }
+       dt = typeMap[pfr.descriptor.Kind()]
+
+       if pfr.isStruct() {
+               dt = pfr.AsStruct().getDataType()
+       }
+
+       if pfr.isMap() {
+               dt = pfr.AsMap().getDataType()
+       }
+
+       if pfr.isList() {
+               dt = pfr.AsList().getDataType()
+       }

Review Comment:
   this should probably be a switch instead



##########
go/arrow/util/messages/README.md:
##########
@@ -0,0 +1,6 @@
+How to generate the .pb.go files

Review Comment:
   You need to add the Apache License header to the file, see the other README 
files for how to do it with markdown



##########
go/arrow/util/protobuf_reflect.go:
##########
@@ -0,0 +1,447 @@
+package util
+
+import (
+       "fmt"
+       "github.com/apache/arrow/go/v16/arrow"
+       "github.com/apache/arrow/go/v16/arrow/array"
+       "github.com/apache/arrow/go/v16/arrow/memory"
+       "github.com/huandu/xstrings"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/reflect/protoreflect"
+       "google.golang.org/protobuf/types/known/anypb"
+       "reflect"
+)
+
+type SchemaOptions struct {
+       exclusionPolicy    func(pfr ProtobufFieldReflection) bool
+       fieldNameFormatter func(str string) string
+}
+
+type ProtobufStructReflection struct {
+       descriptor protoreflect.MessageDescriptor
+       message    protoreflect.Message
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+type Option func(*ProtobufStructReflection)
+
+func NewProtobufStructReflection(msg proto.Message, options ...Option) 
*ProtobufStructReflection {
+       v := reflect.ValueOf(msg)
+       for v.Kind() == reflect.Ptr {
+               v = v.Elem()
+       }
+       includeAll := func(pfr ProtobufFieldReflection) bool {
+               return false
+       }
+       noFormatting := func(str string) string {
+               return str
+       }
+       psr := &ProtobufStructReflection{
+               descriptor: msg.ProtoReflect().Descriptor(),
+               message:    msg.ProtoReflect(),
+               rValue:     v,
+               SchemaOptions: SchemaOptions{
+                       exclusionPolicy:    includeAll,
+                       fieldNameFormatter: noFormatting,
+               },
+       }
+
+       for _, opt := range options {
+               opt(psr)
+       }
+
+       return psr
+}
+
+func WithExclusionPolicy(ex func(pfr ProtobufFieldReflection) bool) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.exclusionPolicy = ex
+       }
+}
+
+func WithFieldNameFormatter(formatter func(str string) string) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.fieldNameFormatter = formatter
+       }
+}
+
+func (psr ProtobufStructReflection) unmarshallAny() ProtobufStructReflection {
+       if psr.descriptor.FullName() == "google.protobuf.Any" {
+               for psr.rValue.Type().Kind() == reflect.Ptr {
+                       psr.rValue = reflect.Indirect(psr.rValue)
+               }
+               fieldValueAsAny, _ := psr.rValue.Interface().(anypb.Any)
+               msg, _ := fieldValueAsAny.UnmarshalNew()
+
+               v := reflect.ValueOf(msg)
+               for v.Kind() == reflect.Ptr {
+                       v = reflect.Indirect(v)
+               }
+
+               return ProtobufStructReflection{
+                       descriptor:    msg.ProtoReflect().Descriptor(),
+                       message:       msg.ProtoReflect(),
+                       rValue:        v,
+                       SchemaOptions: psr.SchemaOptions,
+               }
+       } else {
+               return psr
+       }
+}
+
+func (psr ProtobufStructReflection) GetArrowFields() []arrow.Field {
+       var fields []arrow.Field
+
+       for pfr := range psr.generateStructFields() {
+               fields = append(fields, arrow.Field{
+                       Name:     
psr.fieldNameFormatter(string(pfr.descriptor.Name())),
+                       Type:     pfr.getDataType(),
+                       Nullable: true,
+               })
+       }
+
+       return fields
+}
+
+func (psr ProtobufStructReflection) GetSchema() *arrow.Schema {
+       return arrow.NewSchema(psr.GetArrowFields(), nil)
+}
+
+type ProtobufListReflection struct {
+       ProtobufFieldReflection
+}
+
+func (pfr ProtobufFieldReflection) AsList() ProtobufListReflection {
+       return ProtobufListReflection{pfr}
+}
+
+func (plr ProtobufListReflection) getDataType() arrow.DataType {
+       for li := range plr.generateListItems() {
+               return arrow.ListOf(li.getDataType())
+       }
+       return nil
+}
+
+func (pfr ProtobufFieldReflection) AsMap() ProtobufMapReflection {
+       return ProtobufMapReflection{pfr}
+}
+
+type ProtobufMapReflection struct {
+       ProtobufFieldReflection
+}
+
+func (pmr ProtobufMapReflection) getDataType() arrow.DataType {
+       for kvp := range pmr.generateKeyValuePairs() {
+               return kvp.getDataType()
+       }
+       return nil
+}
+
+type ProtobufMapKeyValuePairReflection struct {
+       k ProtobufFieldReflection
+       v ProtobufFieldReflection
+}
+
+func (pmr ProtobufMapKeyValuePairReflection) getDataType() arrow.DataType {
+       return arrow.MapOf(pmr.k.getDataType(), pmr.v.getDataType())
+}
+
+func (pmr ProtobufMapReflection) generateKeyValuePairs() chan 
ProtobufMapKeyValuePairReflection {
+       out := make(chan ProtobufMapKeyValuePairReflection)
+
+       go func() {
+               defer close(out)
+               for _, k := range pmr.rValue.MapKeys() {
+                       kvp := ProtobufMapKeyValuePairReflection{
+                               k: ProtobufFieldReflection{
+                                       descriptor:    pmr.descriptor.MapKey(),
+                                       prValue:       getMapKey(k),
+                                       rValue:        k,
+                                       SchemaOptions: pmr.SchemaOptions,
+                               },
+                               v: ProtobufFieldReflection{
+                                       descriptor:    
pmr.descriptor.MapValue(),
+                                       prValue:       
pmr.prValue.Map().Get(protoreflect.MapKey(getMapKey(k))),
+                                       rValue:        pmr.rValue.MapIndex(k),
+                                       SchemaOptions: pmr.SchemaOptions,
+                               },
+                       }
+                       out <- kvp
+               }
+       }()
+
+       return out
+}
+
+func (psr ProtobufStructReflection) generateStructFields() chan 
ProtobufFieldReflection {
+       out := make(chan ProtobufFieldReflection)
+
+       go func() {
+               defer close(out)
+               fds := psr.descriptor.Fields()
+               for i := 0; i < fds.Len(); i++ {
+                       pfr := psr.getFieldByName(string(fds.Get(i).Name()))
+                       if psr.exclusionPolicy(pfr) {
+                               continue
+                       }
+                       out <- pfr
+               }
+       }()
+
+       return out
+}
+
+func (pfr ProtobufFieldReflection) AsStruct() ProtobufStructReflection {
+       psr := ProtobufStructReflection{
+               descriptor:    pfr.descriptor.Message(),
+               message:       pfr.prValue.Message(),
+               rValue:        pfr.rValue,
+               SchemaOptions: pfr.SchemaOptions,
+       }
+       psr = psr.unmarshallAny()
+       return psr
+}
+
+func (psr ProtobufStructReflection) getDataType() arrow.DataType {
+       return arrow.StructOf(psr.GetArrowFields()...)
+}
+
+func (psr ProtobufStructReflection) getFieldByName(n string) 
ProtobufFieldReflection {
+       fd := psr.descriptor.Fields().ByTextName(xstrings.ToSnakeCase(n))
+       fv := psr.rValue
+       if fv.IsValid() {
+               if !fv.IsZero() {
+                       for fv.Kind() == reflect.Ptr || fv.Kind() == 
reflect.Interface {
+                               fv = fv.Elem()
+                       }
+                       if fd.ContainingOneof() != nil {
+                               n = string(fd.ContainingOneof().Name())
+                       }
+                       fv = fv.FieldByName(xstrings.ToCamelCase(n))
+                       for fv.Kind() == reflect.Ptr {
+                               fv = fv.Elem()
+                       }
+               }
+       }
+       return ProtobufFieldReflection{
+               fd,
+               psr.message.Get(fd),
+               fv,
+               psr.SchemaOptions,
+       }
+}
+
+type ProtobufFieldReflection struct {
+       descriptor protoreflect.FieldDescriptor
+       prValue    protoreflect.Value
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+func (pfr ProtobufFieldReflection) isStruct() bool {
+       return pfr.descriptor.Kind() == protoreflect.MessageKind && 
!pfr.descriptor.IsMap() && pfr.rValue.Kind() != reflect.Slice
+}
+
+func (pfr ProtobufFieldReflection) isMap() bool {
+       return pfr.descriptor.Kind() == protoreflect.MessageKind && 
pfr.descriptor.IsMap()
+}
+
+func (pfr ProtobufFieldReflection) isList() bool {
+       return pfr.descriptor.IsList() && pfr.rValue.Kind() == reflect.Slice
+}
+
+func (pfr ProtobufFieldReflection) getListLength() int {
+       return pfr.prValue.List().Len()
+}
+
+func (pfr ProtobufFieldReflection) getMapLength() int {
+       return pfr.prValue.Map().Len()
+}
+
+func (plr ProtobufListReflection) generateListItems() chan 
ProtobufFieldReflection {
+       out := make(chan ProtobufFieldReflection)
+
+       go func() {
+               defer close(out)
+               for i := 0; i < plr.prValue.List().Len(); i++ {
+                       out <- ProtobufFieldReflection{
+                               descriptor:    plr.descriptor,
+                               prValue:       plr.prValue.List().Get(i),
+                               rValue:        plr.rValue.Index(i),
+                               SchemaOptions: plr.SchemaOptions,
+                       }
+               }
+       }()
+
+       return out
+}
+
+func (pfr ProtobufFieldReflection) getDataType() arrow.DataType {
+       var dt arrow.DataType
+
+       typeMap := map[protoreflect.Kind]arrow.DataType{
+               //Numeric
+               protoreflect.Int32Kind:    arrow.PrimitiveTypes.Int32,
+               protoreflect.Int64Kind:    arrow.PrimitiveTypes.Int64,
+               protoreflect.Sint32Kind:   arrow.PrimitiveTypes.Int32,
+               protoreflect.Sint64Kind:   arrow.PrimitiveTypes.Int64,
+               protoreflect.Uint32Kind:   arrow.PrimitiveTypes.Uint32,
+               protoreflect.Uint64Kind:   arrow.PrimitiveTypes.Uint64,
+               protoreflect.Fixed32Kind:  arrow.PrimitiveTypes.Uint32,
+               protoreflect.Fixed64Kind:  arrow.PrimitiveTypes.Uint64,
+               protoreflect.Sfixed32Kind: arrow.PrimitiveTypes.Int32,
+               protoreflect.Sfixed64Kind: arrow.PrimitiveTypes.Int64,
+               protoreflect.FloatKind:    arrow.PrimitiveTypes.Float32,
+               protoreflect.DoubleKind:   arrow.PrimitiveTypes.Float64,
+               //Binary
+               protoreflect.StringKind: arrow.BinaryTypes.String,
+               protoreflect.BytesKind:  arrow.BinaryTypes.Binary,
+               //Fixed Width
+               protoreflect.BoolKind: arrow.FixedWidthTypes.Boolean,
+               // Enum
+               protoreflect.EnumKind: arrow.PrimitiveTypes.Int32,
+               // Struct
+               protoreflect.MessageKind: nil,
+       }
+       dt = typeMap[pfr.descriptor.Kind()]
+
+       if pfr.isStruct() {
+               dt = pfr.AsStruct().getDataType()
+       }
+
+       if pfr.isMap() {
+               dt = pfr.AsMap().getDataType()
+       }
+
+       if pfr.isList() {
+               dt = pfr.AsList().getDataType()
+       }
+       return dt
+}
+
+func getBuilders(s *arrow.Schema, m memory.Allocator) []array.Builder {
+       var builders []array.Builder
+
+       for _, f := range s.Fields() {
+               builders = append(builders, array.NewBuilder(m, f.Type))
+       }
+       return builders

Review Comment:
   it would probably be more efficient to create a RecordBuilder and return 
that for this



##########
go/arrow/util/util_message/types.pb.go:
##########
@@ -0,0 +1,522 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.

Review Comment:
   This file should get added to dev/release/rat_exclude_files.txt since it is 
a generated file



##########
go/arrow/util/protobuf_reflect.go:
##########
@@ -0,0 +1,447 @@
+package util
+
+import (
+       "fmt"
+       "github.com/apache/arrow/go/v16/arrow"
+       "github.com/apache/arrow/go/v16/arrow/array"
+       "github.com/apache/arrow/go/v16/arrow/memory"
+       "github.com/huandu/xstrings"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/reflect/protoreflect"
+       "google.golang.org/protobuf/types/known/anypb"
+       "reflect"
+)
+
+type SchemaOptions struct {
+       exclusionPolicy    func(pfr ProtobufFieldReflection) bool
+       fieldNameFormatter func(str string) string
+}
+
+type ProtobufStructReflection struct {
+       descriptor protoreflect.MessageDescriptor
+       message    protoreflect.Message
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+type Option func(*ProtobufStructReflection)
+
+func NewProtobufStructReflection(msg proto.Message, options ...Option) 
*ProtobufStructReflection {
+       v := reflect.ValueOf(msg)
+       for v.Kind() == reflect.Ptr {
+               v = v.Elem()
+       }
+       includeAll := func(pfr ProtobufFieldReflection) bool {
+               return false
+       }
+       noFormatting := func(str string) string {
+               return str
+       }
+       psr := &ProtobufStructReflection{
+               descriptor: msg.ProtoReflect().Descriptor(),
+               message:    msg.ProtoReflect(),
+               rValue:     v,
+               SchemaOptions: SchemaOptions{
+                       exclusionPolicy:    includeAll,
+                       fieldNameFormatter: noFormatting,
+               },
+       }
+
+       for _, opt := range options {
+               opt(psr)
+       }
+
+       return psr
+}
+
+func WithExclusionPolicy(ex func(pfr ProtobufFieldReflection) bool) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.exclusionPolicy = ex
+       }
+}
+
+func WithFieldNameFormatter(formatter func(str string) string) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.fieldNameFormatter = formatter
+       }
+}
+
+func (psr ProtobufStructReflection) unmarshallAny() ProtobufStructReflection {
+       if psr.descriptor.FullName() == "google.protobuf.Any" {
+               for psr.rValue.Type().Kind() == reflect.Ptr {
+                       psr.rValue = reflect.Indirect(psr.rValue)
+               }
+               fieldValueAsAny, _ := psr.rValue.Interface().(anypb.Any)
+               msg, _ := fieldValueAsAny.UnmarshalNew()
+
+               v := reflect.ValueOf(msg)
+               for v.Kind() == reflect.Ptr {
+                       v = reflect.Indirect(v)
+               }
+
+               return ProtobufStructReflection{
+                       descriptor:    msg.ProtoReflect().Descriptor(),
+                       message:       msg.ProtoReflect(),
+                       rValue:        v,
+                       SchemaOptions: psr.SchemaOptions,
+               }
+       } else {
+               return psr
+       }
+}
+
+func (psr ProtobufStructReflection) GetArrowFields() []arrow.Field {
+       var fields []arrow.Field
+
+       for pfr := range psr.generateStructFields() {
+               fields = append(fields, arrow.Field{
+                       Name:     
psr.fieldNameFormatter(string(pfr.descriptor.Name())),
+                       Type:     pfr.getDataType(),
+                       Nullable: true,
+               })
+       }
+
+       return fields
+}
+
+func (psr ProtobufStructReflection) GetSchema() *arrow.Schema {
+       return arrow.NewSchema(psr.GetArrowFields(), nil)
+}
+
+type ProtobufListReflection struct {

Review Comment:
   Godoc comments? same for the other public facing structs



##########
go/arrow/util/protobuf_reflect.go:
##########
@@ -0,0 +1,447 @@
+package util
+
+import (
+       "fmt"
+       "github.com/apache/arrow/go/v16/arrow"
+       "github.com/apache/arrow/go/v16/arrow/array"
+       "github.com/apache/arrow/go/v16/arrow/memory"
+       "github.com/huandu/xstrings"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/reflect/protoreflect"
+       "google.golang.org/protobuf/types/known/anypb"
+       "reflect"
+)
+
+type SchemaOptions struct {
+       exclusionPolicy    func(pfr ProtobufFieldReflection) bool
+       fieldNameFormatter func(str string) string
+}
+
+type ProtobufStructReflection struct {
+       descriptor protoreflect.MessageDescriptor
+       message    protoreflect.Message
+       rValue     reflect.Value
+       SchemaOptions
+}
+
+type Option func(*ProtobufStructReflection)
+
+func NewProtobufStructReflection(msg proto.Message, options ...Option) 
*ProtobufStructReflection {
+       v := reflect.ValueOf(msg)
+       for v.Kind() == reflect.Ptr {
+               v = v.Elem()
+       }
+       includeAll := func(pfr ProtobufFieldReflection) bool {
+               return false
+       }
+       noFormatting := func(str string) string {
+               return str
+       }
+       psr := &ProtobufStructReflection{
+               descriptor: msg.ProtoReflect().Descriptor(),
+               message:    msg.ProtoReflect(),
+               rValue:     v,
+               SchemaOptions: SchemaOptions{
+                       exclusionPolicy:    includeAll,
+                       fieldNameFormatter: noFormatting,
+               },
+       }
+
+       for _, opt := range options {
+               opt(psr)
+       }
+
+       return psr
+}
+
+func WithExclusionPolicy(ex func(pfr ProtobufFieldReflection) bool) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.exclusionPolicy = ex
+       }
+}
+
+func WithFieldNameFormatter(formatter func(str string) string) Option {
+       return func(psr *ProtobufStructReflection) {
+               psr.fieldNameFormatter = formatter
+       }
+}
+
+func (psr ProtobufStructReflection) unmarshallAny() ProtobufStructReflection {
+       if psr.descriptor.FullName() == "google.protobuf.Any" {
+               for psr.rValue.Type().Kind() == reflect.Ptr {
+                       psr.rValue = reflect.Indirect(psr.rValue)
+               }
+               fieldValueAsAny, _ := psr.rValue.Interface().(anypb.Any)
+               msg, _ := fieldValueAsAny.UnmarshalNew()
+
+               v := reflect.ValueOf(msg)
+               for v.Kind() == reflect.Ptr {
+                       v = reflect.Indirect(v)
+               }
+
+               return ProtobufStructReflection{
+                       descriptor:    msg.ProtoReflect().Descriptor(),
+                       message:       msg.ProtoReflect(),
+                       rValue:        v,
+                       SchemaOptions: psr.SchemaOptions,
+               }
+       } else {
+               return psr
+       }
+}
+
+func (psr ProtobufStructReflection) GetArrowFields() []arrow.Field {
+       var fields []arrow.Field
+
+       for pfr := range psr.generateStructFields() {
+               fields = append(fields, arrow.Field{
+                       Name:     
psr.fieldNameFormatter(string(pfr.descriptor.Name())),
+                       Type:     pfr.getDataType(),
+                       Nullable: true,
+               })
+       }
+
+       return fields
+}
+
+func (psr ProtobufStructReflection) GetSchema() *arrow.Schema {
+       return arrow.NewSchema(psr.GetArrowFields(), nil)
+}
+
+type ProtobufListReflection struct {
+       ProtobufFieldReflection
+}
+
+func (pfr ProtobufFieldReflection) AsList() ProtobufListReflection {
+       return ProtobufListReflection{pfr}
+}
+
+func (plr ProtobufListReflection) getDataType() arrow.DataType {
+       for li := range plr.generateListItems() {
+               return arrow.ListOf(li.getDataType())
+       }
+       return nil
+}
+
+func (pfr ProtobufFieldReflection) AsMap() ProtobufMapReflection {
+       return ProtobufMapReflection{pfr}
+}
+
+type ProtobufMapReflection struct {
+       ProtobufFieldReflection
+}
+
+func (pmr ProtobufMapReflection) getDataType() arrow.DataType {
+       for kvp := range pmr.generateKeyValuePairs() {
+               return kvp.getDataType()
+       }
+       return nil
+}
+
+type ProtobufMapKeyValuePairReflection struct {
+       k ProtobufFieldReflection
+       v ProtobufFieldReflection
+}
+
+func (pmr ProtobufMapKeyValuePairReflection) getDataType() arrow.DataType {
+       return arrow.MapOf(pmr.k.getDataType(), pmr.v.getDataType())
+}
+
+func (pmr ProtobufMapReflection) generateKeyValuePairs() chan 
ProtobufMapKeyValuePairReflection {
+       out := make(chan ProtobufMapKeyValuePairReflection)
+
+       go func() {
+               defer close(out)
+               for _, k := range pmr.rValue.MapKeys() {
+                       kvp := ProtobufMapKeyValuePairReflection{
+                               k: ProtobufFieldReflection{
+                                       descriptor:    pmr.descriptor.MapKey(),

Review Comment:
   Why do this concurrently via a channel as opposed to just creating a slice 
or map and returning that?



##########
go/arrow/util/protobuf_reflect.go:
##########
@@ -0,0 +1,447 @@
+package util
+
+import (
+       "fmt"
+       "github.com/apache/arrow/go/v16/arrow"
+       "github.com/apache/arrow/go/v16/arrow/array"
+       "github.com/apache/arrow/go/v16/arrow/memory"
+       "github.com/huandu/xstrings"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/reflect/protoreflect"
+       "google.golang.org/protobuf/types/known/anypb"
+       "reflect"
+)
+
+type SchemaOptions struct {
+       exclusionPolicy    func(pfr ProtobufFieldReflection) bool
+       fieldNameFormatter func(str string) string
+}

Review Comment:
   since this has no exported members or methods and we don't seem to return 
this or take it as a parameter anywhere, should we just change this so it's not 
exported anymore?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to