singh1203 commented on code in PR #335:
URL: https://github.com/apache/arrow-go/pull/335#discussion_r2018061894


##########
arrow/util/schemas/unify.go:
##########
@@ -0,0 +1,490 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schemas
+
+import (
+       "errors"
+       "fmt"
+       "slices"
+
+       "maps"
+
+       "github.com/apache/arrow-go/v18/arrow"
+)
+
+var (
+       ErrPathNotFound     = errors.New("path not found")
+       ErrFieldTypeChanged = errors.New("type changed")
+)
+
+type schemaUnifier struct {
+       old            *treeNode
+       new            *treeNode
+       typeConversion bool
+}
+
+type treeNode struct {
+       root         *treeNode
+       parent       *treeNode
+       name         string
+       path         []string
+       field        arrow.Field
+       children     []*treeNode
+       childmap     map[string]*treeNode
+       metadatas    arrow.Metadata
+       index, depth int32
+       err          error
+}
+
+// UnifySchemas unifies multiple schemas into a single schema. If 
promotePermissive is true, the unification process will promote integer types 
to larger integer types, integer types to floating-point types, STRING to 
LARGE_STRING, LIST to LARGE_LIST and LIST_VIEW to LARGE_LIST_VIEW. If 
promotePermissive is false, the unification process will not allow type 
conversion and will return an error if a type conflict is found.
+func UnifySchemas(promotePermissive bool, schemas ...*arrow.Schema) 
(*arrow.Schema, error) {
+       if len(schemas) < 2 {
+               return nil, fmt.Errorf("not enough schemas to unify")
+       }
+       var u schemaUnifier
+       u.typeConversion = promotePermissive
+       u.old = newTreeFromSchema(schemas[0])
+       for _, s := range schemas[1:] {
+               u.new = newTreeFromSchema(s)
+               u.unify()
+       }
+       if err := collectErrors(u.old); err != nil {
+               return nil, err
+       }
+       return u.old.schema()
+}
+
+func newTreeRoot() *treeNode {
+       f := new(treeNode)
+       f.index = -1
+       f.root = f
+       f.name = "root"
+       f.childmap = make(map[string]*treeNode)
+       f.children = make([]*treeNode, 0)
+       return f
+}
+
+func (f *treeNode) assignChild(child *treeNode) {
+       f.children = append(f.children, child)
+       f.childmap[child.name] = child
+}
+
+func (f *treeNode) newChild(childName string) *treeNode {
+       var child treeNode = treeNode{
+               root:   f.root,
+               parent: f,
+               name:   childName,
+               index:  int32(len(f.children)),
+               depth:  f.depth + 1,
+       }
+       child.path = child.namePath()
+       child.childmap = make(map[string]*treeNode)
+       return &child
+}
+
+func (f *treeNode) mapChildren() {
+       for i, c := range f.children {
+               f.childmap[c.name] = f.children[i]
+       }
+}
+
+// getPath returns a field found at a defined path, otherwise returns 
ErrPathNotFound.
+func (f *treeNode) getPath(path []string) (*treeNode, error) {
+       if len(path) == 0 { // degenerate input
+               return nil, fmt.Errorf("getPath needs at least one key")
+       }
+       if node, ok := f.childmap[path[0]]; !ok {
+               return nil, ErrPathNotFound
+       } else if len(path) == 1 { // we've reached the final key
+               return node, nil
+       } else { // 1+ more keys
+               return node.getPath(path[1:])
+       }
+}
+
+// namePath returns a slice of keys making up the path to the field
+func (f *treeNode) namePath() []string {
+       if len(f.path) == 0 {
+               var path []string
+               cur := f
+               for i := f.depth - 1; i >= 0; i-- {
+                       path = append([]string{cur.name}, path...)
+                       cur = cur.parent
+               }
+               return path
+       }
+       return f.path
+}
+
+// dotPath returns the path to the field in json dot notation
+func (f *treeNode) dotPath() string {
+       var path string = "$"
+       for i, p := range f.path {
+               path = path + p
+               if i+1 != len(f.path) {
+                       path = path + "."
+               }
+       }
+       return path
+}
+
+// graft grafts a new field into the schema tree as a child of f
+func (f *treeNode) graft(n *treeNode) {
+       fIsNullable := f.field.Nullable
+       graft := f.newChild(n.name)
+       graft.field = n.field
+       graft.children = append(graft.children, n.children...)
+       graft.mapChildren()
+       f.assignChild(graft)
+
+       if !(f.root == f) && f.field.Type.ID() == arrow.STRUCT {
+               gf := f.field.Type.(*arrow.StructType)
+               var nf []arrow.Field
+               nf = append(nf, gf.Fields()...)
+               nf = append(nf, graft.field)
+               f.field = arrow.Field{Name: f.name, Type: 
arrow.StructOf(nf...), Nullable: fIsNullable}
+               if !(f.parent.name == "root") && (f.parent != nil) && 
f.parent.field.Type.ID() == arrow.LIST {
+                       f.parent.field = arrow.Field{Name: f.parent.name, Type: 
arrow.ListOfField(f.field)}
+               }
+       }
+}
+
+func collectErrors(f *treeNode) error {
+       var err error
+       if f.err != nil {
+               err = errors.Join(err, f.err)
+       }
+       for _, field := range f.children {
+               err = errors.Join(err, collectErrors(field))
+       }
+       return err
+}
+
+func newTreeFromSchema(a *arrow.Schema) *treeNode {
+       f := newTreeRoot()
+       f.metadatas = a.Metadata()
+       treeFromSchema(f, a)
+       return f
+}
+
+func treeFromSchema(f *treeNode, a *arrow.Schema) {
+       for _, field := range a.Fields() {
+               child := f.newChild(field.Name)
+               child.field = field
+               child.metadatas = field.Metadata
+
+               switch field.Type.ID() {
+               case arrow.STRUCT:
+                       structType := field.Type.(*arrow.StructType)
+                       for _, subField := range structType.Fields() {
+                               subChild := child.newChild(subField.Name)
+                               subChild.field = subField
+                               child.assignChild(subChild)
+                               treeFromSchema(subChild, 
arrow.NewSchema([]arrow.Field{subField}, nil))
+                       }
+               case arrow.LIST:
+                       listType := field.Type.(*arrow.ListType)
+                       elemField := arrow.Field{Name: "element", Type: 
listType.Elem()}
+                       elemChild := child.newChild("element")
+                       elemChild.field = elemField
+                       child.assignChild(elemChild)
+                       treeFromSchema(elemChild, 
arrow.NewSchema([]arrow.Field{elemField}, nil))
+               case arrow.MAP:
+                       mapType := field.Type.(*arrow.MapType)
+                       keyField := arrow.Field{Name: "key", Type: 
mapType.KeyType()}
+                       valueField := arrow.Field{Name: "value", Type: 
mapType.ItemType()}
+                       keyChild := child.newChild("key")
+                       valueChild := child.newChild("value")
+                       keyChild.field = keyField
+                       valueChild.field = valueField
+                       child.assignChild(keyChild)
+                       child.assignChild(valueChild)
+                       treeFromSchema(keyChild, 
arrow.NewSchema([]arrow.Field{keyField}, nil))
+                       treeFromSchema(valueChild, 
arrow.NewSchema([]arrow.Field{valueField}, nil))
+               case arrow.RUN_END_ENCODED:
+                       runEndEncodedType := 
field.Type.(*arrow.RunEndEncodedType)
+                       runEndField := arrow.Field{Name: "run_ends", Type: 
runEndEncodedType.RunEnds()}
+                       valuesField := arrow.Field{Name: "values", Type: 
runEndEncodedType.Encoded()}
+                       runEndChild := child.newChild("run_ends")
+                       valuesChild := child.newChild("values")
+                       runEndChild.field = runEndField
+                       valuesChild.field = valuesField
+                       child.assignChild(runEndChild)
+                       child.assignChild(valuesChild)
+                       treeFromSchema(runEndChild, 
arrow.NewSchema([]arrow.Field{runEndField}, nil))
+                       treeFromSchema(valuesChild, 
arrow.NewSchema([]arrow.Field{valuesField}, nil))
+               case arrow.DICTIONARY:
+                       dictType := field.Type.(*arrow.DictionaryType)
+                       indexField := arrow.Field{Name: "index", Type: 
dictType.IndexType}
+                       valueField := arrow.Field{Name: "dictionary", Type: 
dictType.ValueType}
+                       indexChild := child.newChild("index")
+                       valueChild := child.newChild("dictionary")
+                       indexChild.field = indexField
+                       valueChild.field = valueField
+                       child.assignChild(indexChild)
+                       child.assignChild(valueChild)
+                       treeFromSchema(indexChild, 
arrow.NewSchema([]arrow.Field{indexField}, nil))
+                       treeFromSchema(valueChild, 
arrow.NewSchema([]arrow.Field{valueField}, nil))
+               default:
+                       // Scalar types do not need further processing
+               }
+
+               f.assignChild(child)
+       }
+}
+
+func (f *treeNode) schema() (*arrow.Schema, error) {
+       var s *arrow.Schema
+       defer func(s *arrow.Schema) (*arrow.Schema, error) {
+               if pErr := recover(); pErr != nil {
+                       return nil, fmt.Errorf("schema problem: %v", pErr)
+               }
+               return s, nil
+       }(s)

Review Comment:
   I have a doubt: Since `s` is passed as a parameter to the `defer` function, 
isn't it captured as `nil` at that moment? Also, is this the correct way to 
pass `s` to `defer`, or should we rely on closure instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to