zeroshade commented on code in PR #36796:
URL: https://github.com/apache/arrow/pull/36796#discussion_r1281093354


##########
go/arrow/avro/common.go:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+       "errors"
+       "fmt"
+
+       "github.com/apache/arrow/go/v13/arrow"
+       "github.com/apache/arrow/go/v13/arrow/memory"
+)
+
+var (
+       ErrMismatchFields = errors.New("arrow/avro: number of records mismatch")
+)
+
+// Option configures an Avro reader/writer.
+type Option func(config)
+type config interface{}

Review Comment:
   either use a separate config struct or just use the `OCFReader` as the 
config type, don't bother with it being an interface and doing the type switch. 
I don't think there's any benefit to doing it this way.



##########
go/arrow/avro/common.go:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+       "errors"
+       "fmt"
+
+       "github.com/apache/arrow/go/v13/arrow"
+       "github.com/apache/arrow/go/v13/arrow/memory"
+)
+
+var (
+       ErrMismatchFields = errors.New("arrow/avro: number of records mismatch")
+)
+
+// Option configures an Avro reader/writer.
+type Option func(config)
+type config interface{}
+
+// WithTopLevel specifies whether to include an Avro schema's top level record
+// or only its fields.
+func WithTopLevel(b bool) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.topLevel = b
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithAllocator specifies the Arrow memory allocator used while building 
records.
+func WithAllocator(mem memory.Allocator) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.mem = mem
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithChunk specifies the chunk size used while reading Avro OCF files.
+//
+// If n is zero or 1, no chunking will take place and the reader will create
+// one record per row.
+// If n is greater than 1, chunks of n rows will be read.
+// If n is negative, the reader will load the whole Avro OCF file into memory 
and
+// create one big record with all the rows.
+func WithChunk(n int) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.chunk = n
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// DefaultNullValues is the set of values considered as NULL values by default
+// when Reader is configured to handle NULL values.
+var DefaultNullValues = []string{"", "NULL", "null"}
+
+// WithNullReader sets options for a CSV Reader pertaining to NULL value
+// handling. If stringsCanBeNull is true, then a string that matches one of the
+// nullValues set will be interpreted as NULL. Numeric columns will be checked
+// for nulls in all cases. If no nullValues arguments are passed in, the
+// defaults set in NewReader() will be kept.
+//
+// When no NULL values is given, the default set is taken from 
DefaultNullValues.
+func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.stringsCanBeNull = stringsCanBeNull
+
+                       if len(nullValues) == 0 {
+                               nullValues = DefaultNullValues
+                       }
+                       cfg.nulls = make([]string, len(nullValues))
+                       copy(cfg.nulls, nullValues)
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}

Review Comment:
   see previous comment. doesn't avro have a concept of a null value already so 
we don't need to have the "null reader" stuff that you need to do for a cSV or 
other text based reader. (For example, note that there's nothing like this in 
the JSON reader because if it needs to be null, it should be a JSON null)



##########
go/arrow/avro/common.go:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+       "errors"
+       "fmt"
+
+       "github.com/apache/arrow/go/v13/arrow"
+       "github.com/apache/arrow/go/v13/arrow/memory"
+)
+
+var (
+       ErrMismatchFields = errors.New("arrow/avro: number of records mismatch")
+)
+
+// Option configures an Avro reader/writer.
+type Option func(config)
+type config interface{}
+
+// WithTopLevel specifies whether to include an Avro schema's top level record
+// or only its fields.
+func WithTopLevel(b bool) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.topLevel = b
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithAllocator specifies the Arrow memory allocator used while building 
records.
+func WithAllocator(mem memory.Allocator) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.mem = mem
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithChunk specifies the chunk size used while reading Avro OCF files.
+//
+// If n is zero or 1, no chunking will take place and the reader will create
+// one record per row.
+// If n is greater than 1, chunks of n rows will be read.
+// If n is negative, the reader will load the whole Avro OCF file into memory 
and
+// create one big record with all the rows.
+func WithChunk(n int) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.chunk = n
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// DefaultNullValues is the set of values considered as NULL values by default
+// when Reader is configured to handle NULL values.
+var DefaultNullValues = []string{"", "NULL", "null"}
+
+// WithNullReader sets options for a CSV Reader pertaining to NULL value
+// handling. If stringsCanBeNull is true, then a string that matches one of the
+// nullValues set will be interpreted as NULL. Numeric columns will be checked
+// for nulls in all cases. If no nullValues arguments are passed in, the
+// defaults set in NewReader() will be kept.
+//
+// When no NULL values is given, the default set is taken from 
DefaultNullValues.
+func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.stringsCanBeNull = stringsCanBeNull
+
+                       if len(nullValues) == 0 {
+                               nullValues = DefaultNullValues
+                       }
+                       cfg.nulls = make([]string, len(nullValues))
+                       copy(cfg.nulls, nullValues)
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithColumnTypes allows specifying optional per-column types (disabling
+// type inference on those columns).
+//
+// Will panic if used in conjunction with an explicit schema.
+func WithColumnTypes(types map[string]arrow.DataType) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       if cfg.schema != nil {
+                               panic(fmt.Errorf("%w: cannot use 
WithColumnTypes with explicit schema", arrow.ErrInvalid))
+                       }
+                       cfg.columnTypes = types
+               default:
+                       panic(fmt.Errorf("%w: WithColumnTypes only allowed for 
csv reader", arrow.ErrInvalid))
+               }
+       }
+}
+
+// WithIncludeColumns indicates the names of the columns from the AVRO file
+// that should actually be read and converted (in the slice's order).
+// If set and non-empty, columns not in this slice will be ignored.
+//
+// Will panic if used in conjunction with an explicit schema.
+func WithIncludeColumns(cols []string) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       if cfg.schema != nil {
+                               panic(fmt.Errorf("%w: cannot use 
WithIncludeColumns with explicit schema", arrow.ErrInvalid))
+                       }
+                       cfg.columnFilter = cols
+               default:
+                       panic(fmt.Errorf("%w: WithIncludeColumns only allowed 
on csv Reader", arrow.ErrInvalid))
+               }
+       }
+}
+
+func validate(schema *arrow.Schema) {
+       for i, f := range schema.Fields() {
+               switch ft := f.Type.(type) {
+               case *arrow.BooleanType:
+               case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, 
*arrow.Int64Type:
+               case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, 
*arrow.Uint64Type:
+               case *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type:
+               case *arrow.StringType, *arrow.LargeStringType:
+               case *arrow.TimestampType:
+               case *arrow.Date32Type, *arrow.Date64Type:
+               case *arrow.Decimal128Type, *arrow.Decimal256Type:
+               case *arrow.ListType, *arrow.LargeListType, 
*arrow.FixedSizeListType:
+               case *arrow.BinaryType, *arrow.LargeBinaryType, 
*arrow.FixedSizeBinaryType:
+               case arrow.ExtensionType:
+               case *arrow.NullType:
+               default:
+                       panic(fmt.Errorf("arrow/csv: field %d (%s) has invalid 
data type %T", i, f.Name, ft))
+               }
+       }
+}

Review Comment:
   should we even allow specifying an arrow schema rather than performing the 
conversion?



##########
go/arrow/avro/schema.go:
##########
@@ -0,0 +1,409 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package avro reads Avro OCF files and presents the extracted data as records
+package avro
+
+import (
+       "encoding/json"
+       "fmt"
+       "math"
+       "strconv"
+
+       "github.com/apache/arrow/go/v13/arrow"
+)
+
+type schemaNode struct {
+       name        string
+       ofType      interface{}
+       logicalType string
+       precision   int32
+       scale       int32
+       size        int
+       symbols     []string
+       fields      []interface{}
+}
+
+// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema JSON.
+// If the top level is of record type, set includeTopLevel to either make
+// its fields top level fields in the resulting schema or nested in a single 
field.
+func ArrowSchemaFromAvro(avroSchema []byte) (*arrow.Schema, error) {
+       var m map[string]interface{}
+       var node schemaNode
+       json.Unmarshal(avroSchema, &m)
+       if m["type"].(string) == "record" {
+               if _, ok := m["fields"]; ok {
+                       for _, field := range m["fields"].([]interface{}) {
+                               node.fields = append(node.fields, 
field.(map[string]interface{}))
+                       }
+                       if len(node.fields) == 0 {
+                               return nil, fmt.Errorf("invalid avro schema: no 
top level record fields found")
+                       }
+               }
+       }
+       fields := iterateFields(node.fields)
+       return arrow.NewSchema(fields, nil), nil
+}

Review Comment:
   Avro has a well defined spec for its JSON schema spec, why not create a 
struct to represent it and utilize `json` struct tags + `UnmarshalJSON` / 
`MarshalJSON` functions to simplify the unmarshalling?
   
   Even better, https://pkg.go.dev/github.com/hamba/avro/v2#RecordSchema has a 
Parse function that will return objects which represent the schema for you 
allowing you to likely greatly simplify the `iterateFields` code to instead 
just iterate the objects created from the parsing.



##########
go/arrow/avro/schema.go:
##########
@@ -0,0 +1,409 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package avro reads Avro OCF files and presents the extracted data as records
+package avro
+
+import (
+       "encoding/json"
+       "fmt"
+       "math"
+       "strconv"
+
+       "github.com/apache/arrow/go/v13/arrow"
+)
+
+type schemaNode struct {
+       name        string
+       ofType      interface{}
+       logicalType string
+       precision   int32
+       scale       int32
+       size        int
+       symbols     []string
+       fields      []interface{}
+}
+
+// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema JSON.
+// If the top level is of record type, set includeTopLevel to either make
+// its fields top level fields in the resulting schema or nested in a single 
field.
+func ArrowSchemaFromAvro(avroSchema []byte) (*arrow.Schema, error) {
+       var m map[string]interface{}
+       var node schemaNode
+       json.Unmarshal(avroSchema, &m)
+       if m["type"].(string) == "record" {
+               if _, ok := m["fields"]; ok {
+                       for _, field := range m["fields"].([]interface{}) {
+                               node.fields = append(node.fields, 
field.(map[string]interface{}))
+                       }
+                       if len(node.fields) == 0 {
+                               return nil, fmt.Errorf("invalid avro schema: no 
top level record fields found")
+                       }
+               }
+       }
+       fields := iterateFields(node.fields)
+       return arrow.NewSchema(fields, nil), nil
+}
+
+func iterateFields(f []interface{}) []arrow.Field {
+       var s []arrow.Field
+       for _, field := range f {
+               var n schemaNode
+               n.name = field.(map[string]interface{})["name"].(string)
+               n.ofType = field.(map[string]interface{})["type"]
+               switch n.ofType.(type) {
+               // Getting field type from within field object
+               case string:
+                       switch n.ofType.(string) {
+                       case "enum":
+                               for _, symbol := range 
field.(map[string]interface{})["symbols"].([]interface{}) {
+                                       n.symbols = append(n.symbols, 
symbol.(string))
+                               }
+                       default:
+                               if lt, ok := 
field.(map[string]interface{})["logicalType"]; ok {
+                                       n.logicalType = lt.(string)
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["size"]; ok {
+                                       n.size = int(lt.(float64))
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["precision"]; ok {
+                                       n.precision = int32(lt.(float64))
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["scale"]; ok {
+                                       n.scale = int32(lt.(float64))
+                               }
+                       }
+               // Field type is an object
+               case map[string]interface{}:
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["logicalType"]; 
ok {
+                               n.logicalType = lt.(string)
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["size"]; ok {
+                               n.size = int(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["precision"]; 
ok {
+                               n.precision = int32(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["scale"]; ok {
+                               n.scale = int32(lt.(float64))
+                       }
+               case []interface{}:
+
+               default:
+                       if lt, ok := 
field.(map[string]interface{})["logicalType"]; ok {
+                               n.logicalType = lt.(string)
+                       }
+                       if lt, ok := field.(map[string]interface{})["size"]; ok 
{
+                               n.size = int(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["precision"]; ok {
+                               n.precision = int32(lt.(float64))
+                       }
+                       if lt, ok := field.(map[string]interface{})["scale"]; 
ok {
+                               n.scale = int32(lt.(float64))
+                       }
+               }
+               // Field is of type "record"
+               if nf, f := field.(map[string]interface{})["fields"]; f {
+                       switch nf.(type) {
+                       // primitive & complex types
+                       case map[string]interface{}:
+                               for _, v := range 
nf.(map[string]interface{})["fields"].([]interface{}) {
+                                       n.fields = append(n.fields, 
v.(map[string]interface{}))
+                               }
+                       // type unions
+                       default:
+                               for _, v := range nf.([]interface{}) {
+                                       n.fields = append(n.fields, 
v.(map[string]interface{}))
+                               }
+                       }
+               }
+               s = append(s, traverseNodes(n))
+       }
+       return s
+}
+
+func traverseNodes(node schemaNode) arrow.Field {
+       switch node.ofType.(type) {
+       case string:
+               // Avro primitive type
+               if len(node.fields) == 0 {
+                       switch node.ofType.(string) {
+                       case "boolean", "int", "long", "float", "double", 
"bytes", "string":
+                               if node.logicalType != "" {
+                                       return avroLogicalToArrowField(node)
+                               }
+                               //  Avro primitive type
+                               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+                       case "fixed":
+                               // Duration type is not supported in 
github.com/linkedin/goavro
+                               // Implementing as Binary for now.
+                               switch node.logicalType {
+                               case "decimal":
+                                       return avroLogicalToArrowField(node)
+                               case "duration":
+                                       return arrow.Field{Name: node.name, 
Type: arrow.BinaryTypes.Binary}
+                                       //return arrow.Field{Name: node.name, 
Type: arrow.FixedWidthTypes.MonthDayNanoInterval}
+                               }
+                               return arrow.Field{Name: node.name, Type: 
&arrow.FixedSizeBinaryType{ByteWidth: node.size}}
+                       case "enum":
+                               symbols := make(map[string]string)
+                               for index, symbol := range node.symbols {
+                                       k := strconv.FormatInt(int64(index), 10)
+                                       symbols[k] = symbol
+                               }
+                               var dt arrow.DictionaryType = 
arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: 
arrow.BinaryTypes.String, Ordered: false}
+                               sl := len(symbols)
+                               switch {
+                               case sl <= math.MaxUint8:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint8
+                               case sl > math.MaxUint8 && sl <= math.MaxUint16:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint16
+                               case sl > math.MaxUint16 && sl <= 
math.MaxUint32:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint32
+                               }
+                               return arrow.Field{Name: node.name, Type: &dt, 
Nullable: true, Metadata: arrow.MetadataFrom(symbols)}
+                       default:
+                               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+                       }
+               } else {
+                       // avro "record" type, node has "fields" array
+                       if node.ofType.(string) == "record" {
+                               var n schemaNode
+                               n.name = node.name
+                               n.ofType = node.ofType
+                               if len(node.fields) > 0 {
+                                       n.fields = append(n.fields, 
node.fields...)
+                               }
+                               f := iterateFields(n.fields)
+                               return arrow.Field{Name: node.name, Type: 
arrow.StructOf(f...)}
+                       }
+               }
+       // Avro complex types
+       case map[string]interface{}:
+               //var n schemaNode
+               //n.name = node.name
+               //n.ofType = node.ofType.(map[string]interface{})["type"]
+               switch node.logicalType {
+               case "":
+                       return avroComplexToArrowField(node)
+               default:
+                       return avroLogicalToArrowField(node)
+               }
+
+       // Avro union types
+       case []interface{}:
+               var unionTypes []string
+               for _, ft := range node.ofType.([]interface{}) {
+                       switch ft.(type) {
+                       // primitive types
+                       case string:
+                               if ft != "null" {
+                                       unionTypes = append(unionTypes, 
ft.(string))
+                               }
+                               continue
+                       // complex types
+                       case map[string]interface{}:
+                               var n schemaNode
+                               n.name = node.name
+                               n.ofType = ft.(map[string]interface{})["type"]
+                               if _, f := 
ft.(map[string]interface{})["fields"]; f {
+                                       for _, field := range 
ft.(map[string]interface{})["fields"].([]interface{}) {
+                                               n.fields = append(n.fields, 
field.(map[string]interface{}))
+                                       }
+                               }
+                               f := iterateFields(n.fields)
+                               return arrow.Field{Name: node.name, Type: 
arrow.StructOf(f...)}
+                       }
+               }
+               // Supported Avro union type is null + one other type.
+               // TODO: Complex AVRO union to Arrow Dense || Sparse Union.
+               if len(unionTypes) == 1 {
+                       return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(unionTypes[0])}
+               } else {
+                       // BYTE_ARRAY is the catchall if union type is anything 
beyond null + one other type.
+                       return arrow.Field{Name: node.name, Type: 
arrow.BinaryTypes.Binary}
+               }
+       }
+       return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary}
+}
+
+func avroLogicalToArrowField(node schemaNode) arrow.Field {
+       // Avro logical types
+       switch node.logicalType {
+       // The decimal logical type represents an arbitrary-precision signed 
decimal number of the form unscaled × 10-scale.
+       // A decimal logical type annotates Avro bytes or fixed types. The byte 
array must contain the two’s-complement
+       // representation of the unscaled integer value in big-endian byte 
order. The scale is fixed, and is specified
+       // using an attribute.
+       //
+       // The following attributes are supported:
+       // scale, a JSON integer representing the scale (optional). If not 
specified the scale is 0.
+       // precision, a JSON integer representing the (maximum) precision of 
decimals stored in this type (required).
+       case "decimal":
+               if node.precision <= 38 {
+                       return arrow.Field{Name: node.name, Type: 
&arrow.Decimal128Type{Precision: node.precision, Scale: node.scale}}
+               } else {
+                       return arrow.Field{Name: node.name, Type: 
&arrow.Decimal256Type{Precision: node.precision, Scale: node.scale}}
+               }
+
+       // The uuid logical type represents a random generated universally 
unique identifier (UUID).
+       // A uuid logical type annotates an Avro string. The string has to 
conform with RFC-4122
+       case "uuid":
+               return arrow.Field{Name: node.name, Type: 
arrow.BinaryTypes.String}
+
+       // The date logical type represents a date within the calendar, with no 
reference to a particular
+       // time zone or time of day.
+       // A date logical type annotates an Avro int, where the int stores the 
number of days from the unix epoch,
+       // 1 January 1970 (ISO calendar).
+       case "date":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Date32}
+
+       // The time-millis logical type represents a time of day, with no 
reference to a particular calendar,
+       // time zone or date, with a precision of one millisecond.
+       // A time-millis logical type annotates an Avro int, where the int 
stores the number of milliseconds
+       // after midnight, 00:00:00.000.
+       case "time-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Time32ms}
+
+       // The time-micros logical type represents a time of day, with no 
reference to a particular calendar,
+       // time zone or date, with a precision of one microsecond.
+       // A time-micros logical type annotates an Avro long, where the long 
stores the number of microseconds
+       // after midnight, 00:00:00.000000.
+       case "time-micros":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Time64us}
+
+       // The timestamp-millis logical type represents an instant on the 
global timeline, independent of a
+       // particular time zone or calendar, with a precision of one 
millisecond. Please note that time zone
+       // information gets lost in this process. Upon reading a value back, we 
can only reconstruct the instant,
+       // but not the original representation. In practice, such timestamps 
are typically displayed to users in
+       // their local time zones, therefore they may be displayed differently 
depending on the execution environment.
+       // A timestamp-millis logical type annotates an Avro long, where the 
long stores the number of milliseconds
+       // from the unix epoch, 1 January 1970 00:00:00.000 UTC.
+       case "timestamp-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_ms}
+       // The timestamp-micros logical type represents an instant on the 
global timeline, independent of a
+       // particular time zone or calendar, with a precision of one 
microsecond. Please note that time zone
+       // information gets lost in this process. Upon reading a value back, we 
can only reconstruct the instant,
+       // but not the original representation. In practice, such timestamps 
are typically displayed to users
+       // in their local time zones, therefore they may be displayed 
differently depending on the execution environment.
+       // A timestamp-micros logical type annotates an Avro long, where the 
long stores the number of microseconds
+       // from the unix epoch, 1 January 1970 00:00:00.000000 UTC.
+       case "timestamp-micros":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_us}
+       // The local-timestamp-millis logical type represents a timestamp in a 
local timezone, regardless of
+       // what specific time zone is considered local, with a precision of one 
millisecond.
+       // A local-timestamp-millis logical type annotates an Avro long, where 
the long stores the number of
+       // milliseconds, from 1 January 1970 00:00:00.000.
+       case "local-timestamp-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_ms}
+       // The local-timestamp-micros logical type represents a timestamp in a 
local timezone, regardless of
+       // what specific time zone is considered local, with a precision of one 
microsecond.
+       // A local-timestamp-micros logical type annotates an Avro long, where 
the long stores the number of
+       // microseconds, from 1 January 1970 00:00:00.000000.
+       case "local-timestamp-micros":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_us}
+       //  Avro primitive type
+       default:
+               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+       }
+       return arrow.Field{}

Review Comment:
   shouldn't this be an error?



##########
go/arrow/avro/schema.go:
##########
@@ -0,0 +1,409 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package avro reads Avro OCF files and presents the extracted data as records
+package avro
+
+import (
+       "encoding/json"
+       "fmt"
+       "math"
+       "strconv"
+
+       "github.com/apache/arrow/go/v13/arrow"
+)
+
+type schemaNode struct {
+       name        string
+       ofType      interface{}
+       logicalType string
+       precision   int32
+       scale       int32
+       size        int
+       symbols     []string
+       fields      []interface{}
+}
+
+// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema JSON.
+// If the top level is of record type, set includeTopLevel to either make
+// its fields top level fields in the resulting schema or nested in a single 
field.
+func ArrowSchemaFromAvro(avroSchema []byte) (*arrow.Schema, error) {
+       var m map[string]interface{}
+       var node schemaNode
+       json.Unmarshal(avroSchema, &m)
+       if m["type"].(string) == "record" {
+               if _, ok := m["fields"]; ok {
+                       for _, field := range m["fields"].([]interface{}) {
+                               node.fields = append(node.fields, 
field.(map[string]interface{}))
+                       }
+                       if len(node.fields) == 0 {
+                               return nil, fmt.Errorf("invalid avro schema: no 
top level record fields found")
+                       }
+               }
+       }
+       fields := iterateFields(node.fields)
+       return arrow.NewSchema(fields, nil), nil
+}
+
+func iterateFields(f []interface{}) []arrow.Field {
+       var s []arrow.Field
+       for _, field := range f {
+               var n schemaNode
+               n.name = field.(map[string]interface{})["name"].(string)
+               n.ofType = field.(map[string]interface{})["type"]
+               switch n.ofType.(type) {
+               // Getting field type from within field object
+               case string:
+                       switch n.ofType.(string) {
+                       case "enum":
+                               for _, symbol := range 
field.(map[string]interface{})["symbols"].([]interface{}) {
+                                       n.symbols = append(n.symbols, 
symbol.(string))
+                               }
+                       default:
+                               if lt, ok := 
field.(map[string]interface{})["logicalType"]; ok {
+                                       n.logicalType = lt.(string)
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["size"]; ok {
+                                       n.size = int(lt.(float64))
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["precision"]; ok {
+                                       n.precision = int32(lt.(float64))
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["scale"]; ok {
+                                       n.scale = int32(lt.(float64))
+                               }
+                       }
+               // Field type is an object
+               case map[string]interface{}:
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["logicalType"]; 
ok {
+                               n.logicalType = lt.(string)
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["size"]; ok {
+                               n.size = int(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["precision"]; 
ok {
+                               n.precision = int32(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["scale"]; ok {
+                               n.scale = int32(lt.(float64))
+                       }
+               case []interface{}:
+
+               default:
+                       if lt, ok := 
field.(map[string]interface{})["logicalType"]; ok {
+                               n.logicalType = lt.(string)
+                       }
+                       if lt, ok := field.(map[string]interface{})["size"]; ok 
{
+                               n.size = int(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["precision"]; ok {
+                               n.precision = int32(lt.(float64))
+                       }
+                       if lt, ok := field.(map[string]interface{})["scale"]; 
ok {
+                               n.scale = int32(lt.(float64))
+                       }
+               }
+               // Field is of type "record"
+               if nf, f := field.(map[string]interface{})["fields"]; f {
+                       switch nf.(type) {
+                       // primitive & complex types
+                       case map[string]interface{}:
+                               for _, v := range 
nf.(map[string]interface{})["fields"].([]interface{}) {
+                                       n.fields = append(n.fields, 
v.(map[string]interface{}))
+                               }
+                       // type unions
+                       default:
+                               for _, v := range nf.([]interface{}) {
+                                       n.fields = append(n.fields, 
v.(map[string]interface{}))
+                               }
+                       }
+               }
+               s = append(s, traverseNodes(n))
+       }
+       return s
+}
+
+func traverseNodes(node schemaNode) arrow.Field {
+       switch node.ofType.(type) {
+       case string:
+               // Avro primitive type
+               if len(node.fields) == 0 {
+                       switch node.ofType.(string) {
+                       case "boolean", "int", "long", "float", "double", 
"bytes", "string":
+                               if node.logicalType != "" {
+                                       return avroLogicalToArrowField(node)
+                               }
+                               //  Avro primitive type
+                               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+                       case "fixed":
+                               // Duration type is not supported in 
github.com/linkedin/goavro
+                               // Implementing as Binary for now.
+                               switch node.logicalType {
+                               case "decimal":
+                                       return avroLogicalToArrowField(node)
+                               case "duration":
+                                       return arrow.Field{Name: node.name, 
Type: arrow.BinaryTypes.Binary}
+                                       //return arrow.Field{Name: node.name, 
Type: arrow.FixedWidthTypes.MonthDayNanoInterval}
+                               }
+                               return arrow.Field{Name: node.name, Type: 
&arrow.FixedSizeBinaryType{ByteWidth: node.size}}
+                       case "enum":
+                               symbols := make(map[string]string)
+                               for index, symbol := range node.symbols {
+                                       k := strconv.FormatInt(int64(index), 10)
+                                       symbols[k] = symbol
+                               }
+                               var dt arrow.DictionaryType = 
arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: 
arrow.BinaryTypes.String, Ordered: false}
+                               sl := len(symbols)
+                               switch {
+                               case sl <= math.MaxUint8:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint8
+                               case sl > math.MaxUint8 && sl <= math.MaxUint16:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint16
+                               case sl > math.MaxUint16 && sl <= 
math.MaxUint32:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint32
+                               }
+                               return arrow.Field{Name: node.name, Type: &dt, 
Nullable: true, Metadata: arrow.MetadataFrom(symbols)}
+                       default:
+                               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+                       }
+               } else {
+                       // avro "record" type, node has "fields" array
+                       if node.ofType.(string) == "record" {
+                               var n schemaNode
+                               n.name = node.name
+                               n.ofType = node.ofType
+                               if len(node.fields) > 0 {
+                                       n.fields = append(n.fields, 
node.fields...)
+                               }
+                               f := iterateFields(n.fields)
+                               return arrow.Field{Name: node.name, Type: 
arrow.StructOf(f...)}
+                       }
+               }
+       // Avro complex types
+       case map[string]interface{}:
+               //var n schemaNode
+               //n.name = node.name
+               //n.ofType = node.ofType.(map[string]interface{})["type"]
+               switch node.logicalType {
+               case "":
+                       return avroComplexToArrowField(node)
+               default:
+                       return avroLogicalToArrowField(node)
+               }
+
+       // Avro union types
+       case []interface{}:
+               var unionTypes []string
+               for _, ft := range node.ofType.([]interface{}) {
+                       switch ft.(type) {
+                       // primitive types
+                       case string:
+                               if ft != "null" {
+                                       unionTypes = append(unionTypes, 
ft.(string))
+                               }
+                               continue
+                       // complex types
+                       case map[string]interface{}:
+                               var n schemaNode
+                               n.name = node.name
+                               n.ofType = ft.(map[string]interface{})["type"]
+                               if _, f := 
ft.(map[string]interface{})["fields"]; f {
+                                       for _, field := range 
ft.(map[string]interface{})["fields"].([]interface{}) {
+                                               n.fields = append(n.fields, 
field.(map[string]interface{}))
+                                       }
+                               }
+                               f := iterateFields(n.fields)
+                               return arrow.Field{Name: node.name, Type: 
arrow.StructOf(f...)}
+                       }
+               }
+               // Supported Avro union type is null + one other type.
+               // TODO: Complex AVRO union to Arrow Dense || Sparse Union.
+               if len(unionTypes) == 1 {
+                       return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(unionTypes[0])}
+               } else {
+                       // BYTE_ARRAY is the catchall if union type is anything 
beyond null + one other type.
+                       return arrow.Field{Name: node.name, Type: 
arrow.BinaryTypes.Binary}
+               }
+       }
+       return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary}
+}
+
+func avroLogicalToArrowField(node schemaNode) arrow.Field {
+       // Avro logical types
+       switch node.logicalType {
+       // The decimal logical type represents an arbitrary-precision signed 
decimal number of the form unscaled × 10-scale.
+       // A decimal logical type annotates Avro bytes or fixed types. The byte 
array must contain the two’s-complement
+       // representation of the unscaled integer value in big-endian byte 
order. The scale is fixed, and is specified
+       // using an attribute.
+       //
+       // The following attributes are supported:
+       // scale, a JSON integer representing the scale (optional). If not 
specified the scale is 0.
+       // precision, a JSON integer representing the (maximum) precision of 
decimals stored in this type (required).
+       case "decimal":
+               if node.precision <= 38 {
+                       return arrow.Field{Name: node.name, Type: 
&arrow.Decimal128Type{Precision: node.precision, Scale: node.scale}}
+               } else {
+                       return arrow.Field{Name: node.name, Type: 
&arrow.Decimal256Type{Precision: node.precision, Scale: node.scale}}
+               }
+
+       // The uuid logical type represents a random generated universally 
unique identifier (UUID).
+       // A uuid logical type annotates an Avro string. The string has to 
conform with RFC-4122
+       case "uuid":
+               return arrow.Field{Name: node.name, Type: 
arrow.BinaryTypes.String}
+
+       // The date logical type represents a date within the calendar, with no 
reference to a particular
+       // time zone or time of day.
+       // A date logical type annotates an Avro int, where the int stores the 
number of days from the unix epoch,
+       // 1 January 1970 (ISO calendar).
+       case "date":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Date32}
+
+       // The time-millis logical type represents a time of day, with no 
reference to a particular calendar,
+       // time zone or date, with a precision of one millisecond.
+       // A time-millis logical type annotates an Avro int, where the int 
stores the number of milliseconds
+       // after midnight, 00:00:00.000.
+       case "time-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Time32ms}
+
+       // The time-micros logical type represents a time of day, with no 
reference to a particular calendar,
+       // time zone or date, with a precision of one microsecond.
+       // A time-micros logical type annotates an Avro long, where the long 
stores the number of microseconds
+       // after midnight, 00:00:00.000000.
+       case "time-micros":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Time64us}
+
+       // The timestamp-millis logical type represents an instant on the 
global timeline, independent of a
+       // particular time zone or calendar, with a precision of one 
millisecond. Please note that time zone
+       // information gets lost in this process. Upon reading a value back, we 
can only reconstruct the instant,
+       // but not the original representation. In practice, such timestamps 
are typically displayed to users in
+       // their local time zones, therefore they may be displayed differently 
depending on the execution environment.
+       // A timestamp-millis logical type annotates an Avro long, where the 
long stores the number of milliseconds
+       // from the unix epoch, 1 January 1970 00:00:00.000 UTC.
+       case "timestamp-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_ms}
+       // The timestamp-micros logical type represents an instant on the 
global timeline, independent of a
+       // particular time zone or calendar, with a precision of one 
microsecond. Please note that time zone
+       // information gets lost in this process. Upon reading a value back, we 
can only reconstruct the instant,
+       // but not the original representation. In practice, such timestamps 
are typically displayed to users
+       // in their local time zones, therefore they may be displayed 
differently depending on the execution environment.
+       // A timestamp-micros logical type annotates an Avro long, where the 
long stores the number of microseconds
+       // from the unix epoch, 1 January 1970 00:00:00.000000 UTC.
+       case "timestamp-micros":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_us}
+       // The local-timestamp-millis logical type represents a timestamp in a 
local timezone, regardless of
+       // what specific time zone is considered local, with a precision of one 
millisecond.
+       // A local-timestamp-millis logical type annotates an Avro long, where 
the long stores the number of
+       // milliseconds, from 1 January 1970 00:00:00.000.
+       case "local-timestamp-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_ms}
+       // The local-timestamp-micros logical type represents a timestamp in a 
local timezone, regardless of
+       // what specific time zone is considered local, with a precision of one 
microsecond.
+       // A local-timestamp-micros logical type annotates an Avro long, where 
the long stores the number of
+       // microseconds, from 1 January 1970 00:00:00.000000.
+       case "local-timestamp-micros":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_us}
+       //  Avro primitive type
+       default:
+               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+       }
+       return arrow.Field{}
+}
+
+func avroComplexToArrowField(node schemaNode) arrow.Field {
+       var n schemaNode
+       n.name = node.name
+       n.ofType = node.ofType.(map[string]interface{})["type"]
+       // Avro "array" field type
+       if i, ok := node.ofType.(map[string]interface{})["items"]; ok {
+               switch i.(string) {
+               case "int", "long", "float", "double", "bytes", "boolean", 
"string":
+                       return arrow.Field{Name: node.name, Type: 
arrow.ListOf(AvroPrimitiveToArrowType(i.(string)))}
+               case "enum", "fixed", "map", "record", "array":
+                       return arrow.Field{Name: node.name, Type: 
arrow.ListOf(avroComplexToArrowField(n).Type)}
+               case "decimal", "uuid", "date", "time-millis", "time-micros", 
"timestamp-millis", "timestamp-micros", "local-timestamp-millis", 
"local-timestamp-micros":
+                       return arrow.Field{Name: node.name, Type: 
arrow.ListOf(avroLogicalToArrowField(n).Type)}
+               }
+       }
+       // Avro "enum" field type = Arrow dictionary type
+       if i, ok := node.ofType.(map[string]interface{})["symbols"]; ok {
+               symbols := make(map[string]string)
+               for index, symbol := range i.([]interface{}) {
+                       k := strconv.FormatInt(int64(index), 10)
+                       symbols[k] = symbol.(string)
+               }
+               var dt arrow.DictionaryType = arrow.DictionaryType{IndexType: 
arrow.PrimitiveTypes.Uint64, ValueType: arrow.BinaryTypes.String, Ordered: 
false}
+               sl := len(symbols)
+               switch {
+               case sl <= math.MaxUint8:
+                       dt.IndexType = arrow.PrimitiveTypes.Uint8
+               case sl > math.MaxUint8 && sl <= math.MaxUint16:
+                       dt.IndexType = arrow.PrimitiveTypes.Uint16
+               case sl > math.MaxUint16 && sl <= math.MaxUint32:
+                       dt.IndexType = arrow.PrimitiveTypes.Uint32
+               }
+               return arrow.Field{Name: node.name, Type: &dt, Nullable: true, 
Metadata: arrow.MetadataFrom(symbols)}
+       }
+       // Avro "fixed" field type = Arrow FixedSize Primitive BinaryType
+       if i, ok := node.ofType.(map[string]interface{})["size"]; ok {
+               return arrow.Field{Name: node.name, Type: 
&arrow.FixedSizeBinaryType{ByteWidth: int(i.(float64))}}
+       }
+       // Avro "map" field type
+       if i, ok := node.ofType.(map[string]interface{})["values"]; ok {
+               return arrow.Field{Name: node.name, Type: 
arrow.MapOf(arrow.BinaryTypes.String, AvroPrimitiveToArrowType(i.(string)))}
+       }
+       // Avro "record" field type
+       if _, f := node.ofType.(map[string]interface{})["fields"]; f {
+               for _, field := range 
node.ofType.(map[string]interface{})["fields"].([]interface{}) {
+                       n.fields = append(n.fields, 
field.(map[string]interface{}))
+               }
+               s := iterateFields(n.fields)
+               return arrow.Field{Name: n.name, Type: arrow.StructOf(s...)}
+       }
+       return arrow.Field{}
+}
+
+// AvroPrimitiveToArrowType returns the Arrow DataType equivalent to a
+// Avro primitive type.
+//
+// NOTE: Arrow Binary type is used as a catchall to avoid potential data loss.
+func AvroPrimitiveToArrowType(avroFieldType string) arrow.DataType {
+       switch avroFieldType {
+       // int: 32-bit signed integer
+       case "int":
+               return arrow.PrimitiveTypes.Int32
+       // long: 64-bit signed integer
+       case "long":
+               return arrow.PrimitiveTypes.Int64
+       // float: single precision (32-bit) IEEE 754 floating-point number
+       case "float":
+               return arrow.PrimitiveTypes.Float32
+       // double: double precision (64-bit) IEEE 754 floating-point number
+       case "double":
+               return arrow.PrimitiveTypes.Float64
+       // bytes: sequence of 8-bit unsigned bytes
+       case "bytes":
+               return arrow.BinaryTypes.Binary
+       // boolean: a binary value
+       case "boolean":
+               return arrow.FixedWidthTypes.Boolean
+       // string: unicode character sequence
+       case "string":
+               return arrow.BinaryTypes.String
+       // fallback to binary type for any unsupported type
+       default:
+               return arrow.BinaryTypes.Binary

Review Comment:
   I'd prefer an error with `arrow.ErrNotImplemented` for any unsupported type



##########
go/go.mod:
##########
@@ -59,6 +59,7 @@ require (
        github.com/golang/protobuf v1.5.2 // indirect
        github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // 
indirect
        github.com/kr/text v0.2.0 // indirect
+       github.com/linkedin/goavro/v2 v2.12.0 // indirect

Review Comment:
   Might I suggest `github.com/hamba/avro/v2/` as it seems to be about an order 
of magnitude faster than goavro....?



##########
go/arrow/avro/common.go:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+       "errors"
+       "fmt"
+
+       "github.com/apache/arrow/go/v13/arrow"
+       "github.com/apache/arrow/go/v13/arrow/memory"
+)
+
+var (
+       ErrMismatchFields = errors.New("arrow/avro: number of records mismatch")
+)
+
+// Option configures an Avro reader/writer.
+type Option func(config)
+type config interface{}
+
+// WithTopLevel specifies whether to include an Avro schema's top level record
+// or only its fields.
+func WithTopLevel(b bool) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.topLevel = b
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithAllocator specifies the Arrow memory allocator used while building 
records.
+func WithAllocator(mem memory.Allocator) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.mem = mem
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithChunk specifies the chunk size used while reading Avro OCF files.
+//
+// If n is zero or 1, no chunking will take place and the reader will create
+// one record per row.
+// If n is greater than 1, chunks of n rows will be read.
+// If n is negative, the reader will load the whole Avro OCF file into memory 
and
+// create one big record with all the rows.
+func WithChunk(n int) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.chunk = n
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// DefaultNullValues is the set of values considered as NULL values by default
+// when Reader is configured to handle NULL values.
+var DefaultNullValues = []string{"", "NULL", "null"}
+
+// WithNullReader sets options for a CSV Reader pertaining to NULL value
+// handling. If stringsCanBeNull is true, then a string that matches one of the
+// nullValues set will be interpreted as NULL. Numeric columns will be checked
+// for nulls in all cases. If no nullValues arguments are passed in, the
+// defaults set in NewReader() will be kept.
+//
+// When no NULL values is given, the default set is taken from 
DefaultNullValues.
+func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.stringsCanBeNull = stringsCanBeNull
+
+                       if len(nullValues) == 0 {
+                               nullValues = DefaultNullValues
+                       }
+                       cfg.nulls = make([]string, len(nullValues))
+                       copy(cfg.nulls, nullValues)
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithColumnTypes allows specifying optional per-column types (disabling
+// type inference on those columns).
+//
+// Will panic if used in conjunction with an explicit schema.
+func WithColumnTypes(types map[string]arrow.DataType) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       if cfg.schema != nil {
+                               panic(fmt.Errorf("%w: cannot use 
WithColumnTypes with explicit schema", arrow.ErrInvalid))
+                       }
+                       cfg.columnTypes = types
+               default:
+                       panic(fmt.Errorf("%w: WithColumnTypes only allowed for 
csv reader", arrow.ErrInvalid))
+               }
+       }
+}
+
+// WithIncludeColumns indicates the names of the columns from the AVRO file
+// that should actually be read and converted (in the slice's order).
+// If set and non-empty, columns not in this slice will be ignored.
+//
+// Will panic if used in conjunction with an explicit schema.
+func WithIncludeColumns(cols []string) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       if cfg.schema != nil {
+                               panic(fmt.Errorf("%w: cannot use 
WithIncludeColumns with explicit schema", arrow.ErrInvalid))
+                       }
+                       cfg.columnFilter = cols
+               default:
+                       panic(fmt.Errorf("%w: WithIncludeColumns only allowed 
on csv Reader", arrow.ErrInvalid))
+               }
+       }
+}

Review Comment:
   I believe that Avro isn't order based, it's name and field based which means 
that you can probably get away with reading this into a `map[string]struct{}` 
to create a "set" of fields.



##########
go/arrow/avro/common.go:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+       "errors"
+       "fmt"
+
+       "github.com/apache/arrow/go/v13/arrow"
+       "github.com/apache/arrow/go/v13/arrow/memory"
+)
+
+var (
+       ErrMismatchFields = errors.New("arrow/avro: number of records mismatch")
+)
+
+// Option configures an Avro reader/writer.
+type Option func(config)
+type config interface{}
+
+// WithTopLevel specifies whether to include an Avro schema's top level record
+// or only its fields.
+func WithTopLevel(b bool) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.topLevel = b
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithAllocator specifies the Arrow memory allocator used while building 
records.
+func WithAllocator(mem memory.Allocator) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.mem = mem
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithChunk specifies the chunk size used while reading Avro OCF files.
+//
+// If n is zero or 1, no chunking will take place and the reader will create
+// one record per row.
+// If n is greater than 1, chunks of n rows will be read.
+// If n is negative, the reader will load the whole Avro OCF file into memory 
and
+// create one big record with all the rows.
+func WithChunk(n int) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.chunk = n
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// DefaultNullValues is the set of values considered as NULL values by default
+// when Reader is configured to handle NULL values.
+var DefaultNullValues = []string{"", "NULL", "null"}
+
+// WithNullReader sets options for a CSV Reader pertaining to NULL value
+// handling. If stringsCanBeNull is true, then a string that matches one of the
+// nullValues set will be interpreted as NULL. Numeric columns will be checked
+// for nulls in all cases. If no nullValues arguments are passed in, the
+// defaults set in NewReader() will be kept.
+//
+// When no NULL values is given, the default set is taken from 
DefaultNullValues.
+func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.stringsCanBeNull = stringsCanBeNull
+
+                       if len(nullValues) == 0 {
+                               nullValues = DefaultNullValues
+                       }
+                       cfg.nulls = make([]string, len(nullValues))
+                       copy(cfg.nulls, nullValues)
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithColumnTypes allows specifying optional per-column types (disabling
+// type inference on those columns).
+//
+// Will panic if used in conjunction with an explicit schema.
+func WithColumnTypes(types map[string]arrow.DataType) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       if cfg.schema != nil {
+                               panic(fmt.Errorf("%w: cannot use 
WithColumnTypes with explicit schema", arrow.ErrInvalid))
+                       }
+                       cfg.columnTypes = types
+               default:
+                       panic(fmt.Errorf("%w: WithColumnTypes only allowed for 
csv reader", arrow.ErrInvalid))
+               }
+       }
+}

Review Comment:
   We don't need type inference since Avro has a specific typed schema, so this 
is unnecessary entirely. We should just define what the mapping between Avro 
types and Arrow types are.



##########
go/arrow/avro/schema.go:
##########
@@ -0,0 +1,409 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package avro reads Avro OCF files and presents the extracted data as records
+package avro
+
+import (
+       "encoding/json"
+       "fmt"
+       "math"
+       "strconv"
+
+       "github.com/apache/arrow/go/v13/arrow"
+)
+
+type schemaNode struct {
+       name        string
+       ofType      interface{}
+       logicalType string
+       precision   int32
+       scale       int32
+       size        int
+       symbols     []string
+       fields      []interface{}
+}
+
+// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema JSON.
+// If the top level is of record type, set includeTopLevel to either make
+// its fields top level fields in the resulting schema or nested in a single 
field.
+func ArrowSchemaFromAvro(avroSchema []byte) (*arrow.Schema, error) {
+       var m map[string]interface{}
+       var node schemaNode
+       json.Unmarshal(avroSchema, &m)
+       if m["type"].(string) == "record" {
+               if _, ok := m["fields"]; ok {
+                       for _, field := range m["fields"].([]interface{}) {
+                               node.fields = append(node.fields, 
field.(map[string]interface{}))
+                       }
+                       if len(node.fields) == 0 {
+                               return nil, fmt.Errorf("invalid avro schema: no 
top level record fields found")
+                       }
+               }
+       }
+       fields := iterateFields(node.fields)
+       return arrow.NewSchema(fields, nil), nil
+}
+
+func iterateFields(f []interface{}) []arrow.Field {
+       var s []arrow.Field
+       for _, field := range f {
+               var n schemaNode
+               n.name = field.(map[string]interface{})["name"].(string)
+               n.ofType = field.(map[string]interface{})["type"]
+               switch n.ofType.(type) {
+               // Getting field type from within field object
+               case string:
+                       switch n.ofType.(string) {
+                       case "enum":
+                               for _, symbol := range 
field.(map[string]interface{})["symbols"].([]interface{}) {
+                                       n.symbols = append(n.symbols, 
symbol.(string))
+                               }
+                       default:
+                               if lt, ok := 
field.(map[string]interface{})["logicalType"]; ok {
+                                       n.logicalType = lt.(string)
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["size"]; ok {
+                                       n.size = int(lt.(float64))
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["precision"]; ok {
+                                       n.precision = int32(lt.(float64))
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["scale"]; ok {
+                                       n.scale = int32(lt.(float64))
+                               }
+                       }
+               // Field type is an object
+               case map[string]interface{}:
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["logicalType"]; 
ok {
+                               n.logicalType = lt.(string)
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["size"]; ok {
+                               n.size = int(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["precision"]; 
ok {
+                               n.precision = int32(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["scale"]; ok {
+                               n.scale = int32(lt.(float64))
+                       }
+               case []interface{}:
+
+               default:
+                       if lt, ok := 
field.(map[string]interface{})["logicalType"]; ok {
+                               n.logicalType = lt.(string)
+                       }
+                       if lt, ok := field.(map[string]interface{})["size"]; ok 
{
+                               n.size = int(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["precision"]; ok {
+                               n.precision = int32(lt.(float64))
+                       }
+                       if lt, ok := field.(map[string]interface{})["scale"]; 
ok {
+                               n.scale = int32(lt.(float64))
+                       }
+               }
+               // Field is of type "record"
+               if nf, f := field.(map[string]interface{})["fields"]; f {
+                       switch nf.(type) {
+                       // primitive & complex types
+                       case map[string]interface{}:
+                               for _, v := range 
nf.(map[string]interface{})["fields"].([]interface{}) {
+                                       n.fields = append(n.fields, 
v.(map[string]interface{}))
+                               }
+                       // type unions
+                       default:
+                               for _, v := range nf.([]interface{}) {
+                                       n.fields = append(n.fields, 
v.(map[string]interface{}))
+                               }
+                       }
+               }
+               s = append(s, traverseNodes(n))
+       }
+       return s
+}
+
+func traverseNodes(node schemaNode) arrow.Field {
+       switch node.ofType.(type) {
+       case string:
+               // Avro primitive type
+               if len(node.fields) == 0 {
+                       switch node.ofType.(string) {
+                       case "boolean", "int", "long", "float", "double", 
"bytes", "string":
+                               if node.logicalType != "" {
+                                       return avroLogicalToArrowField(node)
+                               }
+                               //  Avro primitive type
+                               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+                       case "fixed":
+                               // Duration type is not supported in 
github.com/linkedin/goavro
+                               // Implementing as Binary for now.
+                               switch node.logicalType {
+                               case "decimal":
+                                       return avroLogicalToArrowField(node)
+                               case "duration":
+                                       return arrow.Field{Name: node.name, 
Type: arrow.BinaryTypes.Binary}
+                                       //return arrow.Field{Name: node.name, 
Type: arrow.FixedWidthTypes.MonthDayNanoInterval}
+                               }
+                               return arrow.Field{Name: node.name, Type: 
&arrow.FixedSizeBinaryType{ByteWidth: node.size}}
+                       case "enum":
+                               symbols := make(map[string]string)
+                               for index, symbol := range node.symbols {
+                                       k := strconv.FormatInt(int64(index), 10)
+                                       symbols[k] = symbol
+                               }
+                               var dt arrow.DictionaryType = 
arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: 
arrow.BinaryTypes.String, Ordered: false}
+                               sl := len(symbols)
+                               switch {
+                               case sl <= math.MaxUint8:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint8
+                               case sl > math.MaxUint8 && sl <= math.MaxUint16:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint16
+                               case sl > math.MaxUint16 && sl <= 
math.MaxUint32:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint32
+                               }
+                               return arrow.Field{Name: node.name, Type: &dt, 
Nullable: true, Metadata: arrow.MetadataFrom(symbols)}
+                       default:
+                               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+                       }
+               } else {
+                       // avro "record" type, node has "fields" array
+                       if node.ofType.(string) == "record" {
+                               var n schemaNode
+                               n.name = node.name
+                               n.ofType = node.ofType
+                               if len(node.fields) > 0 {
+                                       n.fields = append(n.fields, 
node.fields...)
+                               }
+                               f := iterateFields(n.fields)
+                               return arrow.Field{Name: node.name, Type: 
arrow.StructOf(f...)}
+                       }
+               }
+       // Avro complex types
+       case map[string]interface{}:
+               //var n schemaNode
+               //n.name = node.name
+               //n.ofType = node.ofType.(map[string]interface{})["type"]
+               switch node.logicalType {
+               case "":
+                       return avroComplexToArrowField(node)
+               default:
+                       return avroLogicalToArrowField(node)
+               }
+
+       // Avro union types
+       case []interface{}:
+               var unionTypes []string
+               for _, ft := range node.ofType.([]interface{}) {
+                       switch ft.(type) {
+                       // primitive types
+                       case string:
+                               if ft != "null" {
+                                       unionTypes = append(unionTypes, 
ft.(string))
+                               }
+                               continue
+                       // complex types
+                       case map[string]interface{}:
+                               var n schemaNode
+                               n.name = node.name
+                               n.ofType = ft.(map[string]interface{})["type"]
+                               if _, f := 
ft.(map[string]interface{})["fields"]; f {
+                                       for _, field := range 
ft.(map[string]interface{})["fields"].([]interface{}) {
+                                               n.fields = append(n.fields, 
field.(map[string]interface{}))
+                                       }
+                               }
+                               f := iterateFields(n.fields)
+                               return arrow.Field{Name: node.name, Type: 
arrow.StructOf(f...)}
+                       }
+               }
+               // Supported Avro union type is null + one other type.
+               // TODO: Complex AVRO union to Arrow Dense || Sparse Union.
+               if len(unionTypes) == 1 {
+                       return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(unionTypes[0])}
+               } else {
+                       // BYTE_ARRAY is the catchall if union type is anything 
beyond null + one other type.
+                       return arrow.Field{Name: node.name, Type: 
arrow.BinaryTypes.Binary}
+               }
+       }
+       return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary}
+}
+
+func avroLogicalToArrowField(node schemaNode) arrow.Field {
+       // Avro logical types
+       switch node.logicalType {
+       // The decimal logical type represents an arbitrary-precision signed 
decimal number of the form unscaled × 10-scale.
+       // A decimal logical type annotates Avro bytes or fixed types. The byte 
array must contain the two’s-complement
+       // representation of the unscaled integer value in big-endian byte 
order. The scale is fixed, and is specified
+       // using an attribute.
+       //
+       // The following attributes are supported:
+       // scale, a JSON integer representing the scale (optional). If not 
specified the scale is 0.
+       // precision, a JSON integer representing the (maximum) precision of 
decimals stored in this type (required).
+       case "decimal":
+               if node.precision <= 38 {
+                       return arrow.Field{Name: node.name, Type: 
&arrow.Decimal128Type{Precision: node.precision, Scale: node.scale}}
+               } else {
+                       return arrow.Field{Name: node.name, Type: 
&arrow.Decimal256Type{Precision: node.precision, Scale: node.scale}}
+               }

Review Comment:
   probably easier to simplify this function and switch to something like:
   
   ```go
   var dt arrow.DataType
   switch node.logicalType {
   // ...
   case "decimal":
        id := arrow.DECIMAL128
        if node.precision > 38 {
                id = arrow.DECIMAL256
        }
        dt, _ = arrow.NewDecimalType(id, node.precision, node.scale)
   case "uuid":
        dt = types.NewUUIDType() // import 
"github.com/apache/arrow/v13/internal/types"
   // etc.....
   }
   return arrow.Field{Name: node.name, Type: dt}
   ```
   



##########
go/arrow/avro/common.go:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+       "errors"
+       "fmt"
+
+       "github.com/apache/arrow/go/v13/arrow"
+       "github.com/apache/arrow/go/v13/arrow/memory"
+)
+
+var (
+       ErrMismatchFields = errors.New("arrow/avro: number of records mismatch")
+)
+
+// Option configures an Avro reader/writer.
+type Option func(config)
+type config interface{}
+
+// WithTopLevel specifies whether to include an Avro schema's top level record
+// or only its fields.
+func WithTopLevel(b bool) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.topLevel = b
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithAllocator specifies the Arrow memory allocator used while building 
records.
+func WithAllocator(mem memory.Allocator) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.mem = mem
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithChunk specifies the chunk size used while reading Avro OCF files.
+//
+// If n is zero or 1, no chunking will take place and the reader will create
+// one record per row.
+// If n is greater than 1, chunks of n rows will be read.
+// If n is negative, the reader will load the whole Avro OCF file into memory 
and
+// create one big record with all the rows.
+func WithChunk(n int) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.chunk = n
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// DefaultNullValues is the set of values considered as NULL values by default
+// when Reader is configured to handle NULL values.
+var DefaultNullValues = []string{"", "NULL", "null"}

Review Comment:
   doesn't Avro have a concept of null already so you don't need to interpret 
strings as null?



##########
go/arrow/avro/common.go:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+       "errors"
+       "fmt"
+
+       "github.com/apache/arrow/go/v13/arrow"
+       "github.com/apache/arrow/go/v13/arrow/memory"
+)
+
+var (
+       ErrMismatchFields = errors.New("arrow/avro: number of records mismatch")
+)
+
+// Option configures an Avro reader/writer.
+type Option func(config)
+type config interface{}
+
+// WithTopLevel specifies whether to include an Avro schema's top level record
+// or only its fields.
+func WithTopLevel(b bool) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.topLevel = b
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithAllocator specifies the Arrow memory allocator used while building 
records.
+func WithAllocator(mem memory.Allocator) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.mem = mem
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithChunk specifies the chunk size used while reading Avro OCF files.
+//
+// If n is zero or 1, no chunking will take place and the reader will create
+// one record per row.
+// If n is greater than 1, chunks of n rows will be read.
+// If n is negative, the reader will load the whole Avro OCF file into memory 
and
+// create one big record with all the rows.
+func WithChunk(n int) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.chunk = n
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// DefaultNullValues is the set of values considered as NULL values by default
+// when Reader is configured to handle NULL values.
+var DefaultNullValues = []string{"", "NULL", "null"}
+
+// WithNullReader sets options for a CSV Reader pertaining to NULL value
+// handling. If stringsCanBeNull is true, then a string that matches one of the
+// nullValues set will be interpreted as NULL. Numeric columns will be checked
+// for nulls in all cases. If no nullValues arguments are passed in, the
+// defaults set in NewReader() will be kept.
+//
+// When no NULL values is given, the default set is taken from 
DefaultNullValues.
+func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       cfg.stringsCanBeNull = stringsCanBeNull
+
+                       if len(nullValues) == 0 {
+                               nullValues = DefaultNullValues
+                       }
+                       cfg.nulls = make([]string, len(nullValues))
+                       copy(cfg.nulls, nullValues)
+               default:
+                       panic(fmt.Errorf("arrow/avro: unknown config type %T", 
cfg))
+               }
+       }
+}
+
+// WithColumnTypes allows specifying optional per-column types (disabling
+// type inference on those columns).
+//
+// Will panic if used in conjunction with an explicit schema.
+func WithColumnTypes(types map[string]arrow.DataType) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       if cfg.schema != nil {
+                               panic(fmt.Errorf("%w: cannot use 
WithColumnTypes with explicit schema", arrow.ErrInvalid))
+                       }
+                       cfg.columnTypes = types
+               default:
+                       panic(fmt.Errorf("%w: WithColumnTypes only allowed for 
csv reader", arrow.ErrInvalid))
+               }
+       }
+}
+
+// WithIncludeColumns indicates the names of the columns from the AVRO file
+// that should actually be read and converted (in the slice's order).
+// If set and non-empty, columns not in this slice will be ignored.
+//
+// Will panic if used in conjunction with an explicit schema.
+func WithIncludeColumns(cols []string) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *OCFReader:
+                       if cfg.schema != nil {
+                               panic(fmt.Errorf("%w: cannot use 
WithIncludeColumns with explicit schema", arrow.ErrInvalid))
+                       }

Review Comment:
   why do we need the explicit schema to do this? we have the avro schema so we 
can throw an error if the field is not found etc.



##########
go/arrow/avro/schema.go:
##########
@@ -0,0 +1,409 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package avro reads Avro OCF files and presents the extracted data as records
+package avro
+
+import (
+       "encoding/json"
+       "fmt"
+       "math"
+       "strconv"
+
+       "github.com/apache/arrow/go/v13/arrow"
+)
+
+type schemaNode struct {
+       name        string
+       ofType      interface{}
+       logicalType string
+       precision   int32
+       scale       int32
+       size        int
+       symbols     []string
+       fields      []interface{}
+}
+
+// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema JSON.
+// If the top level is of record type, set includeTopLevel to either make
+// its fields top level fields in the resulting schema or nested in a single 
field.
+func ArrowSchemaFromAvro(avroSchema []byte) (*arrow.Schema, error) {
+       var m map[string]interface{}
+       var node schemaNode
+       json.Unmarshal(avroSchema, &m)
+       if m["type"].(string) == "record" {
+               if _, ok := m["fields"]; ok {
+                       for _, field := range m["fields"].([]interface{}) {
+                               node.fields = append(node.fields, 
field.(map[string]interface{}))
+                       }
+                       if len(node.fields) == 0 {
+                               return nil, fmt.Errorf("invalid avro schema: no 
top level record fields found")
+                       }
+               }
+       }
+       fields := iterateFields(node.fields)
+       return arrow.NewSchema(fields, nil), nil
+}
+
+func iterateFields(f []interface{}) []arrow.Field {
+       var s []arrow.Field
+       for _, field := range f {
+               var n schemaNode
+               n.name = field.(map[string]interface{})["name"].(string)
+               n.ofType = field.(map[string]interface{})["type"]
+               switch n.ofType.(type) {
+               // Getting field type from within field object
+               case string:
+                       switch n.ofType.(string) {
+                       case "enum":
+                               for _, symbol := range 
field.(map[string]interface{})["symbols"].([]interface{}) {
+                                       n.symbols = append(n.symbols, 
symbol.(string))
+                               }
+                       default:
+                               if lt, ok := 
field.(map[string]interface{})["logicalType"]; ok {
+                                       n.logicalType = lt.(string)
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["size"]; ok {
+                                       n.size = int(lt.(float64))
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["precision"]; ok {
+                                       n.precision = int32(lt.(float64))
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["scale"]; ok {
+                                       n.scale = int32(lt.(float64))
+                               }
+                       }
+               // Field type is an object
+               case map[string]interface{}:
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["logicalType"]; 
ok {
+                               n.logicalType = lt.(string)
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["size"]; ok {
+                               n.size = int(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["precision"]; 
ok {
+                               n.precision = int32(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["scale"]; ok {
+                               n.scale = int32(lt.(float64))
+                       }
+               case []interface{}:
+
+               default:
+                       if lt, ok := 
field.(map[string]interface{})["logicalType"]; ok {
+                               n.logicalType = lt.(string)
+                       }
+                       if lt, ok := field.(map[string]interface{})["size"]; ok 
{
+                               n.size = int(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["precision"]; ok {
+                               n.precision = int32(lt.(float64))
+                       }
+                       if lt, ok := field.(map[string]interface{})["scale"]; 
ok {
+                               n.scale = int32(lt.(float64))
+                       }
+               }
+               // Field is of type "record"
+               if nf, f := field.(map[string]interface{})["fields"]; f {
+                       switch nf.(type) {
+                       // primitive & complex types
+                       case map[string]interface{}:
+                               for _, v := range 
nf.(map[string]interface{})["fields"].([]interface{}) {
+                                       n.fields = append(n.fields, 
v.(map[string]interface{}))
+                               }
+                       // type unions
+                       default:
+                               for _, v := range nf.([]interface{}) {
+                                       n.fields = append(n.fields, 
v.(map[string]interface{}))
+                               }
+                       }
+               }
+               s = append(s, traverseNodes(n))
+       }
+       return s
+}
+
+func traverseNodes(node schemaNode) arrow.Field {
+       switch node.ofType.(type) {
+       case string:
+               // Avro primitive type
+               if len(node.fields) == 0 {
+                       switch node.ofType.(string) {
+                       case "boolean", "int", "long", "float", "double", 
"bytes", "string":
+                               if node.logicalType != "" {
+                                       return avroLogicalToArrowField(node)
+                               }
+                               //  Avro primitive type
+                               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+                       case "fixed":
+                               // Duration type is not supported in 
github.com/linkedin/goavro
+                               // Implementing as Binary for now.
+                               switch node.logicalType {
+                               case "decimal":
+                                       return avroLogicalToArrowField(node)
+                               case "duration":
+                                       return arrow.Field{Name: node.name, 
Type: arrow.BinaryTypes.Binary}
+                                       //return arrow.Field{Name: node.name, 
Type: arrow.FixedWidthTypes.MonthDayNanoInterval}
+                               }
+                               return arrow.Field{Name: node.name, Type: 
&arrow.FixedSizeBinaryType{ByteWidth: node.size}}
+                       case "enum":
+                               symbols := make(map[string]string)
+                               for index, symbol := range node.symbols {
+                                       k := strconv.FormatInt(int64(index), 10)
+                                       symbols[k] = symbol
+                               }
+                               var dt arrow.DictionaryType = 
arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: 
arrow.BinaryTypes.String, Ordered: false}
+                               sl := len(symbols)
+                               switch {
+                               case sl <= math.MaxUint8:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint8
+                               case sl > math.MaxUint8 && sl <= math.MaxUint16:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint16
+                               case sl > math.MaxUint16 && sl <= 
math.MaxUint32:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint32
+                               }
+                               return arrow.Field{Name: node.name, Type: &dt, 
Nullable: true, Metadata: arrow.MetadataFrom(symbols)}
+                       default:
+                               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+                       }
+               } else {
+                       // avro "record" type, node has "fields" array
+                       if node.ofType.(string) == "record" {
+                               var n schemaNode
+                               n.name = node.name
+                               n.ofType = node.ofType
+                               if len(node.fields) > 0 {
+                                       n.fields = append(n.fields, 
node.fields...)
+                               }
+                               f := iterateFields(n.fields)
+                               return arrow.Field{Name: node.name, Type: 
arrow.StructOf(f...)}
+                       }
+               }
+       // Avro complex types
+       case map[string]interface{}:
+               //var n schemaNode
+               //n.name = node.name
+               //n.ofType = node.ofType.(map[string]interface{})["type"]
+               switch node.logicalType {
+               case "":
+                       return avroComplexToArrowField(node)
+               default:
+                       return avroLogicalToArrowField(node)
+               }
+
+       // Avro union types
+       case []interface{}:
+               var unionTypes []string
+               for _, ft := range node.ofType.([]interface{}) {
+                       switch ft.(type) {
+                       // primitive types
+                       case string:
+                               if ft != "null" {
+                                       unionTypes = append(unionTypes, 
ft.(string))
+                               }
+                               continue
+                       // complex types
+                       case map[string]interface{}:
+                               var n schemaNode
+                               n.name = node.name
+                               n.ofType = ft.(map[string]interface{})["type"]
+                               if _, f := 
ft.(map[string]interface{})["fields"]; f {
+                                       for _, field := range 
ft.(map[string]interface{})["fields"].([]interface{}) {
+                                               n.fields = append(n.fields, 
field.(map[string]interface{}))
+                                       }
+                               }
+                               f := iterateFields(n.fields)
+                               return arrow.Field{Name: node.name, Type: 
arrow.StructOf(f...)}
+                       }
+               }
+               // Supported Avro union type is null + one other type.
+               // TODO: Complex AVRO union to Arrow Dense || Sparse Union.
+               if len(unionTypes) == 1 {
+                       return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(unionTypes[0])}
+               } else {
+                       // BYTE_ARRAY is the catchall if union type is anything 
beyond null + one other type.
+                       return arrow.Field{Name: node.name, Type: 
arrow.BinaryTypes.Binary}
+               }
+       }
+       return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary}
+}
+
+func avroLogicalToArrowField(node schemaNode) arrow.Field {
+       // Avro logical types
+       switch node.logicalType {
+       // The decimal logical type represents an arbitrary-precision signed 
decimal number of the form unscaled × 10-scale.
+       // A decimal logical type annotates Avro bytes or fixed types. The byte 
array must contain the two’s-complement
+       // representation of the unscaled integer value in big-endian byte 
order. The scale is fixed, and is specified
+       // using an attribute.
+       //
+       // The following attributes are supported:
+       // scale, a JSON integer representing the scale (optional). If not 
specified the scale is 0.
+       // precision, a JSON integer representing the (maximum) precision of 
decimals stored in this type (required).
+       case "decimal":
+               if node.precision <= 38 {
+                       return arrow.Field{Name: node.name, Type: 
&arrow.Decimal128Type{Precision: node.precision, Scale: node.scale}}
+               } else {
+                       return arrow.Field{Name: node.name, Type: 
&arrow.Decimal256Type{Precision: node.precision, Scale: node.scale}}
+               }
+
+       // The uuid logical type represents a random generated universally 
unique identifier (UUID).
+       // A uuid logical type annotates an Avro string. The string has to 
conform with RFC-4122
+       case "uuid":
+               return arrow.Field{Name: node.name, Type: 
arrow.BinaryTypes.String}
+
+       // The date logical type represents a date within the calendar, with no 
reference to a particular
+       // time zone or time of day.
+       // A date logical type annotates an Avro int, where the int stores the 
number of days from the unix epoch,
+       // 1 January 1970 (ISO calendar).
+       case "date":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Date32}
+
+       // The time-millis logical type represents a time of day, with no 
reference to a particular calendar,
+       // time zone or date, with a precision of one millisecond.
+       // A time-millis logical type annotates an Avro int, where the int 
stores the number of milliseconds
+       // after midnight, 00:00:00.000.
+       case "time-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Time32ms}
+
+       // The time-micros logical type represents a time of day, with no 
reference to a particular calendar,
+       // time zone or date, with a precision of one microsecond.
+       // A time-micros logical type annotates an Avro long, where the long 
stores the number of microseconds
+       // after midnight, 00:00:00.000000.
+       case "time-micros":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Time64us}
+
+       // The timestamp-millis logical type represents an instant on the 
global timeline, independent of a
+       // particular time zone or calendar, with a precision of one 
millisecond. Please note that time zone
+       // information gets lost in this process. Upon reading a value back, we 
can only reconstruct the instant,
+       // but not the original representation. In practice, such timestamps 
are typically displayed to users in
+       // their local time zones, therefore they may be displayed differently 
depending on the execution environment.
+       // A timestamp-millis logical type annotates an Avro long, where the 
long stores the number of milliseconds
+       // from the unix epoch, 1 January 1970 00:00:00.000 UTC.
+       case "timestamp-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_ms}
+       // The timestamp-micros logical type represents an instant on the 
global timeline, independent of a
+       // particular time zone or calendar, with a precision of one 
microsecond. Please note that time zone
+       // information gets lost in this process. Upon reading a value back, we 
can only reconstruct the instant,
+       // but not the original representation. In practice, such timestamps 
are typically displayed to users
+       // in their local time zones, therefore they may be displayed 
differently depending on the execution environment.
+       // A timestamp-micros logical type annotates an Avro long, where the 
long stores the number of microseconds
+       // from the unix epoch, 1 January 1970 00:00:00.000000 UTC.
+       case "timestamp-micros":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_us}
+       // The local-timestamp-millis logical type represents a timestamp in a 
local timezone, regardless of
+       // what specific time zone is considered local, with a precision of one 
millisecond.
+       // A local-timestamp-millis logical type annotates an Avro long, where 
the long stores the number of
+       // milliseconds, from 1 January 1970 00:00:00.000.
+       case "local-timestamp-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_ms}

Review Comment:
   the default `arrow.FixedWidthTypes.Timestamp_ms` sets the timezone to "UTC". 
You should manually create `&arrow.TimestampType{Unit: arrow.Millisecond}` for 
the local timestamp (or maybe we should create `NoTZ` versions of these vars 
that you can use for this....?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to