loicalleyne commented on code in PR #36796:
URL: https://github.com/apache/arrow/pull/36796#discussion_r1281162168


##########
go/arrow/avro/schema.go:
##########
@@ -0,0 +1,409 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package avro reads Avro OCF files and presents the extracted data as records
+package avro
+
+import (
+       "encoding/json"
+       "fmt"
+       "math"
+       "strconv"
+
+       "github.com/apache/arrow/go/v13/arrow"
+)
+
+type schemaNode struct {
+       name        string
+       ofType      interface{}
+       logicalType string
+       precision   int32
+       scale       int32
+       size        int
+       symbols     []string
+       fields      []interface{}
+}
+
+// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema JSON.
+// If the top level is of record type, set includeTopLevel to either make
+// its fields top level fields in the resulting schema or nested in a single 
field.
+func ArrowSchemaFromAvro(avroSchema []byte) (*arrow.Schema, error) {
+       var m map[string]interface{}
+       var node schemaNode
+       json.Unmarshal(avroSchema, &m)
+       if m["type"].(string) == "record" {
+               if _, ok := m["fields"]; ok {
+                       for _, field := range m["fields"].([]interface{}) {
+                               node.fields = append(node.fields, 
field.(map[string]interface{}))
+                       }
+                       if len(node.fields) == 0 {
+                               return nil, fmt.Errorf("invalid avro schema: no 
top level record fields found")
+                       }
+               }
+       }
+       fields := iterateFields(node.fields)
+       return arrow.NewSchema(fields, nil), nil
+}
+
+func iterateFields(f []interface{}) []arrow.Field {
+       var s []arrow.Field
+       for _, field := range f {
+               var n schemaNode
+               n.name = field.(map[string]interface{})["name"].(string)
+               n.ofType = field.(map[string]interface{})["type"]
+               switch n.ofType.(type) {
+               // Getting field type from within field object
+               case string:
+                       switch n.ofType.(string) {
+                       case "enum":
+                               for _, symbol := range 
field.(map[string]interface{})["symbols"].([]interface{}) {
+                                       n.symbols = append(n.symbols, 
symbol.(string))
+                               }
+                       default:
+                               if lt, ok := 
field.(map[string]interface{})["logicalType"]; ok {
+                                       n.logicalType = lt.(string)
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["size"]; ok {
+                                       n.size = int(lt.(float64))
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["precision"]; ok {
+                                       n.precision = int32(lt.(float64))
+                               }
+                               if lt, ok := 
field.(map[string]interface{})["scale"]; ok {
+                                       n.scale = int32(lt.(float64))
+                               }
+                       }
+               // Field type is an object
+               case map[string]interface{}:
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["logicalType"]; 
ok {
+                               n.logicalType = lt.(string)
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["size"]; ok {
+                               n.size = int(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["precision"]; 
ok {
+                               n.precision = int32(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["type"].(map[string]interface{})["scale"]; ok {
+                               n.scale = int32(lt.(float64))
+                       }
+               case []interface{}:
+
+               default:
+                       if lt, ok := 
field.(map[string]interface{})["logicalType"]; ok {
+                               n.logicalType = lt.(string)
+                       }
+                       if lt, ok := field.(map[string]interface{})["size"]; ok 
{
+                               n.size = int(lt.(float64))
+                       }
+                       if lt, ok := 
field.(map[string]interface{})["precision"]; ok {
+                               n.precision = int32(lt.(float64))
+                       }
+                       if lt, ok := field.(map[string]interface{})["scale"]; 
ok {
+                               n.scale = int32(lt.(float64))
+                       }
+               }
+               // Field is of type "record"
+               if nf, f := field.(map[string]interface{})["fields"]; f {
+                       switch nf.(type) {
+                       // primitive & complex types
+                       case map[string]interface{}:
+                               for _, v := range 
nf.(map[string]interface{})["fields"].([]interface{}) {
+                                       n.fields = append(n.fields, 
v.(map[string]interface{}))
+                               }
+                       // type unions
+                       default:
+                               for _, v := range nf.([]interface{}) {
+                                       n.fields = append(n.fields, 
v.(map[string]interface{}))
+                               }
+                       }
+               }
+               s = append(s, traverseNodes(n))
+       }
+       return s
+}
+
+func traverseNodes(node schemaNode) arrow.Field {
+       switch node.ofType.(type) {
+       case string:
+               // Avro primitive type
+               if len(node.fields) == 0 {
+                       switch node.ofType.(string) {
+                       case "boolean", "int", "long", "float", "double", 
"bytes", "string":
+                               if node.logicalType != "" {
+                                       return avroLogicalToArrowField(node)
+                               }
+                               //  Avro primitive type
+                               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+                       case "fixed":
+                               // Duration type is not supported in 
github.com/linkedin/goavro
+                               // Implementing as Binary for now.
+                               switch node.logicalType {
+                               case "decimal":
+                                       return avroLogicalToArrowField(node)
+                               case "duration":
+                                       return arrow.Field{Name: node.name, 
Type: arrow.BinaryTypes.Binary}
+                                       //return arrow.Field{Name: node.name, 
Type: arrow.FixedWidthTypes.MonthDayNanoInterval}
+                               }
+                               return arrow.Field{Name: node.name, Type: 
&arrow.FixedSizeBinaryType{ByteWidth: node.size}}
+                       case "enum":
+                               symbols := make(map[string]string)
+                               for index, symbol := range node.symbols {
+                                       k := strconv.FormatInt(int64(index), 10)
+                                       symbols[k] = symbol
+                               }
+                               var dt arrow.DictionaryType = 
arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: 
arrow.BinaryTypes.String, Ordered: false}
+                               sl := len(symbols)
+                               switch {
+                               case sl <= math.MaxUint8:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint8
+                               case sl > math.MaxUint8 && sl <= math.MaxUint16:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint16
+                               case sl > math.MaxUint16 && sl <= 
math.MaxUint32:
+                                       dt.IndexType = 
arrow.PrimitiveTypes.Uint32
+                               }
+                               return arrow.Field{Name: node.name, Type: &dt, 
Nullable: true, Metadata: arrow.MetadataFrom(symbols)}
+                       default:
+                               return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(node.ofType.(string))}
+                       }
+               } else {
+                       // avro "record" type, node has "fields" array
+                       if node.ofType.(string) == "record" {
+                               var n schemaNode
+                               n.name = node.name
+                               n.ofType = node.ofType
+                               if len(node.fields) > 0 {
+                                       n.fields = append(n.fields, 
node.fields...)
+                               }
+                               f := iterateFields(n.fields)
+                               return arrow.Field{Name: node.name, Type: 
arrow.StructOf(f...)}
+                       }
+               }
+       // Avro complex types
+       case map[string]interface{}:
+               //var n schemaNode
+               //n.name = node.name
+               //n.ofType = node.ofType.(map[string]interface{})["type"]
+               switch node.logicalType {
+               case "":
+                       return avroComplexToArrowField(node)
+               default:
+                       return avroLogicalToArrowField(node)
+               }
+
+       // Avro union types
+       case []interface{}:
+               var unionTypes []string
+               for _, ft := range node.ofType.([]interface{}) {
+                       switch ft.(type) {
+                       // primitive types
+                       case string:
+                               if ft != "null" {
+                                       unionTypes = append(unionTypes, 
ft.(string))
+                               }
+                               continue
+                       // complex types
+                       case map[string]interface{}:
+                               var n schemaNode
+                               n.name = node.name
+                               n.ofType = ft.(map[string]interface{})["type"]
+                               if _, f := 
ft.(map[string]interface{})["fields"]; f {
+                                       for _, field := range 
ft.(map[string]interface{})["fields"].([]interface{}) {
+                                               n.fields = append(n.fields, 
field.(map[string]interface{}))
+                                       }
+                               }
+                               f := iterateFields(n.fields)
+                               return arrow.Field{Name: node.name, Type: 
arrow.StructOf(f...)}
+                       }
+               }
+               // Supported Avro union type is null + one other type.
+               // TODO: Complex AVRO union to Arrow Dense || Sparse Union.
+               if len(unionTypes) == 1 {
+                       return arrow.Field{Name: node.name, Type: 
AvroPrimitiveToArrowType(unionTypes[0])}
+               } else {
+                       // BYTE_ARRAY is the catchall if union type is anything 
beyond null + one other type.
+                       return arrow.Field{Name: node.name, Type: 
arrow.BinaryTypes.Binary}
+               }
+       }
+       return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary}
+}
+
+func avroLogicalToArrowField(node schemaNode) arrow.Field {
+       // Avro logical types
+       switch node.logicalType {
+       // The decimal logical type represents an arbitrary-precision signed 
decimal number of the form unscaled × 10-scale.
+       // A decimal logical type annotates Avro bytes or fixed types. The byte 
array must contain the two’s-complement
+       // representation of the unscaled integer value in big-endian byte 
order. The scale is fixed, and is specified
+       // using an attribute.
+       //
+       // The following attributes are supported:
+       // scale, a JSON integer representing the scale (optional). If not 
specified the scale is 0.
+       // precision, a JSON integer representing the (maximum) precision of 
decimals stored in this type (required).
+       case "decimal":
+               if node.precision <= 38 {
+                       return arrow.Field{Name: node.name, Type: 
&arrow.Decimal128Type{Precision: node.precision, Scale: node.scale}}
+               } else {
+                       return arrow.Field{Name: node.name, Type: 
&arrow.Decimal256Type{Precision: node.precision, Scale: node.scale}}
+               }
+
+       // The uuid logical type represents a random generated universally 
unique identifier (UUID).
+       // A uuid logical type annotates an Avro string. The string has to 
conform with RFC-4122
+       case "uuid":
+               return arrow.Field{Name: node.name, Type: 
arrow.BinaryTypes.String}
+
+       // The date logical type represents a date within the calendar, with no 
reference to a particular
+       // time zone or time of day.
+       // A date logical type annotates an Avro int, where the int stores the 
number of days from the unix epoch,
+       // 1 January 1970 (ISO calendar).
+       case "date":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Date32}
+
+       // The time-millis logical type represents a time of day, with no 
reference to a particular calendar,
+       // time zone or date, with a precision of one millisecond.
+       // A time-millis logical type annotates an Avro int, where the int 
stores the number of milliseconds
+       // after midnight, 00:00:00.000.
+       case "time-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Time32ms}
+
+       // The time-micros logical type represents a time of day, with no 
reference to a particular calendar,
+       // time zone or date, with a precision of one microsecond.
+       // A time-micros logical type annotates an Avro long, where the long 
stores the number of microseconds
+       // after midnight, 00:00:00.000000.
+       case "time-micros":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Time64us}
+
+       // The timestamp-millis logical type represents an instant on the 
global timeline, independent of a
+       // particular time zone or calendar, with a precision of one 
millisecond. Please note that time zone
+       // information gets lost in this process. Upon reading a value back, we 
can only reconstruct the instant,
+       // but not the original representation. In practice, such timestamps 
are typically displayed to users in
+       // their local time zones, therefore they may be displayed differently 
depending on the execution environment.
+       // A timestamp-millis logical type annotates an Avro long, where the 
long stores the number of milliseconds
+       // from the unix epoch, 1 January 1970 00:00:00.000 UTC.
+       case "timestamp-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_ms}
+       // The timestamp-micros logical type represents an instant on the 
global timeline, independent of a
+       // particular time zone or calendar, with a precision of one 
microsecond. Please note that time zone
+       // information gets lost in this process. Upon reading a value back, we 
can only reconstruct the instant,
+       // but not the original representation. In practice, such timestamps 
are typically displayed to users
+       // in their local time zones, therefore they may be displayed 
differently depending on the execution environment.
+       // A timestamp-micros logical type annotates an Avro long, where the 
long stores the number of microseconds
+       // from the unix epoch, 1 January 1970 00:00:00.000000 UTC.
+       case "timestamp-micros":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_us}
+       // The local-timestamp-millis logical type represents a timestamp in a 
local timezone, regardless of
+       // what specific time zone is considered local, with a precision of one 
millisecond.
+       // A local-timestamp-millis logical type annotates an Avro long, where 
the long stores the number of
+       // milliseconds, from 1 January 1970 00:00:00.000.
+       case "local-timestamp-millis":
+               return arrow.Field{Name: node.name, Type: 
arrow.FixedWidthTypes.Timestamp_ms}

Review Comment:
   perhaps NoTZ versions would be better - I was going on the assumption that 
the users of the data would have to cast the timezone to the data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to