zeroshade commented on code in PR #36796: URL: https://github.com/apache/arrow/pull/36796#discussion_r1281093354
########## go/arrow/avro/common.go: ########## @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "errors" + "fmt" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/memory" +) + +var ( + ErrMismatchFields = errors.New("arrow/avro: number of records mismatch") +) + +// Option configures an Avro reader/writer. +type Option func(config) +type config interface{} Review Comment: either use a separate config struct or just use the `OCFReader` as the config type, don't bother with it being an interface and doing the type switch. I don't think there's any benefit to doing it this way. ########## go/arrow/avro/common.go: ########## @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "errors" + "fmt" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/memory" +) + +var ( + ErrMismatchFields = errors.New("arrow/avro: number of records mismatch") +) + +// Option configures an Avro reader/writer. +type Option func(config) +type config interface{} + +// WithTopLevel specifies whether to include an Avro schema's top level record +// or only its fields. +func WithTopLevel(b bool) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.topLevel = b + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithAllocator specifies the Arrow memory allocator used while building records. +func WithAllocator(mem memory.Allocator) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.mem = mem + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithChunk specifies the chunk size used while reading Avro OCF files. +// +// If n is zero or 1, no chunking will take place and the reader will create +// one record per row. +// If n is greater than 1, chunks of n rows will be read. +// If n is negative, the reader will load the whole Avro OCF file into memory and +// create one big record with all the rows. +func WithChunk(n int) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.chunk = n + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// DefaultNullValues is the set of values considered as NULL values by default +// when Reader is configured to handle NULL values. +var DefaultNullValues = []string{"", "NULL", "null"} + +// WithNullReader sets options for a CSV Reader pertaining to NULL value +// handling. If stringsCanBeNull is true, then a string that matches one of the +// nullValues set will be interpreted as NULL. Numeric columns will be checked +// for nulls in all cases. If no nullValues arguments are passed in, the +// defaults set in NewReader() will be kept. +// +// When no NULL values is given, the default set is taken from DefaultNullValues. +func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.stringsCanBeNull = stringsCanBeNull + + if len(nullValues) == 0 { + nullValues = DefaultNullValues + } + cfg.nulls = make([]string, len(nullValues)) + copy(cfg.nulls, nullValues) + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} Review Comment: see previous comment. doesn't avro have a concept of a null value already so we don't need to have the "null reader" stuff that you need to do for a cSV or other text based reader. (For example, note that there's nothing like this in the JSON reader because if it needs to be null, it should be a JSON null) ########## go/arrow/avro/common.go: ########## @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "errors" + "fmt" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/memory" +) + +var ( + ErrMismatchFields = errors.New("arrow/avro: number of records mismatch") +) + +// Option configures an Avro reader/writer. +type Option func(config) +type config interface{} + +// WithTopLevel specifies whether to include an Avro schema's top level record +// or only its fields. +func WithTopLevel(b bool) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.topLevel = b + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithAllocator specifies the Arrow memory allocator used while building records. +func WithAllocator(mem memory.Allocator) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.mem = mem + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithChunk specifies the chunk size used while reading Avro OCF files. +// +// If n is zero or 1, no chunking will take place and the reader will create +// one record per row. +// If n is greater than 1, chunks of n rows will be read. +// If n is negative, the reader will load the whole Avro OCF file into memory and +// create one big record with all the rows. +func WithChunk(n int) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.chunk = n + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// DefaultNullValues is the set of values considered as NULL values by default +// when Reader is configured to handle NULL values. +var DefaultNullValues = []string{"", "NULL", "null"} + +// WithNullReader sets options for a CSV Reader pertaining to NULL value +// handling. If stringsCanBeNull is true, then a string that matches one of the +// nullValues set will be interpreted as NULL. Numeric columns will be checked +// for nulls in all cases. If no nullValues arguments are passed in, the +// defaults set in NewReader() will be kept. +// +// When no NULL values is given, the default set is taken from DefaultNullValues. +func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.stringsCanBeNull = stringsCanBeNull + + if len(nullValues) == 0 { + nullValues = DefaultNullValues + } + cfg.nulls = make([]string, len(nullValues)) + copy(cfg.nulls, nullValues) + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithColumnTypes allows specifying optional per-column types (disabling +// type inference on those columns). +// +// Will panic if used in conjunction with an explicit schema. +func WithColumnTypes(types map[string]arrow.DataType) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + if cfg.schema != nil { + panic(fmt.Errorf("%w: cannot use WithColumnTypes with explicit schema", arrow.ErrInvalid)) + } + cfg.columnTypes = types + default: + panic(fmt.Errorf("%w: WithColumnTypes only allowed for csv reader", arrow.ErrInvalid)) + } + } +} + +// WithIncludeColumns indicates the names of the columns from the AVRO file +// that should actually be read and converted (in the slice's order). +// If set and non-empty, columns not in this slice will be ignored. +// +// Will panic if used in conjunction with an explicit schema. +func WithIncludeColumns(cols []string) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + if cfg.schema != nil { + panic(fmt.Errorf("%w: cannot use WithIncludeColumns with explicit schema", arrow.ErrInvalid)) + } + cfg.columnFilter = cols + default: + panic(fmt.Errorf("%w: WithIncludeColumns only allowed on csv Reader", arrow.ErrInvalid)) + } + } +} + +func validate(schema *arrow.Schema) { + for i, f := range schema.Fields() { + switch ft := f.Type.(type) { + case *arrow.BooleanType: + case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type: + case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type: + case *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type: + case *arrow.StringType, *arrow.LargeStringType: + case *arrow.TimestampType: + case *arrow.Date32Type, *arrow.Date64Type: + case *arrow.Decimal128Type, *arrow.Decimal256Type: + case *arrow.ListType, *arrow.LargeListType, *arrow.FixedSizeListType: + case *arrow.BinaryType, *arrow.LargeBinaryType, *arrow.FixedSizeBinaryType: + case arrow.ExtensionType: + case *arrow.NullType: + default: + panic(fmt.Errorf("arrow/csv: field %d (%s) has invalid data type %T", i, f.Name, ft)) + } + } +} Review Comment: should we even allow specifying an arrow schema rather than performing the conversion? ########## go/arrow/avro/schema.go: ########## @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package avro reads Avro OCF files and presents the extracted data as records +package avro + +import ( + "encoding/json" + "fmt" + "math" + "strconv" + + "github.com/apache/arrow/go/v13/arrow" +) + +type schemaNode struct { + name string + ofType interface{} + logicalType string + precision int32 + scale int32 + size int + symbols []string + fields []interface{} +} + +// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema JSON. +// If the top level is of record type, set includeTopLevel to either make +// its fields top level fields in the resulting schema or nested in a single field. +func ArrowSchemaFromAvro(avroSchema []byte) (*arrow.Schema, error) { + var m map[string]interface{} + var node schemaNode + json.Unmarshal(avroSchema, &m) + if m["type"].(string) == "record" { + if _, ok := m["fields"]; ok { + for _, field := range m["fields"].([]interface{}) { + node.fields = append(node.fields, field.(map[string]interface{})) + } + if len(node.fields) == 0 { + return nil, fmt.Errorf("invalid avro schema: no top level record fields found") + } + } + } + fields := iterateFields(node.fields) + return arrow.NewSchema(fields, nil), nil +} Review Comment: Avro has a well defined spec for its JSON schema spec, why not create a struct to represent it and utilize `json` struct tags + `UnmarshalJSON` / `MarshalJSON` functions to simplify the unmarshalling? Even better, https://pkg.go.dev/github.com/hamba/avro/v2#RecordSchema has a Parse function that will return objects which represent the schema for you allowing you to likely greatly simplify the `iterateFields` code to instead just iterate the objects created from the parsing. ########## go/arrow/avro/schema.go: ########## @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package avro reads Avro OCF files and presents the extracted data as records +package avro + +import ( + "encoding/json" + "fmt" + "math" + "strconv" + + "github.com/apache/arrow/go/v13/arrow" +) + +type schemaNode struct { + name string + ofType interface{} + logicalType string + precision int32 + scale int32 + size int + symbols []string + fields []interface{} +} + +// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema JSON. +// If the top level is of record type, set includeTopLevel to either make +// its fields top level fields in the resulting schema or nested in a single field. +func ArrowSchemaFromAvro(avroSchema []byte) (*arrow.Schema, error) { + var m map[string]interface{} + var node schemaNode + json.Unmarshal(avroSchema, &m) + if m["type"].(string) == "record" { + if _, ok := m["fields"]; ok { + for _, field := range m["fields"].([]interface{}) { + node.fields = append(node.fields, field.(map[string]interface{})) + } + if len(node.fields) == 0 { + return nil, fmt.Errorf("invalid avro schema: no top level record fields found") + } + } + } + fields := iterateFields(node.fields) + return arrow.NewSchema(fields, nil), nil +} + +func iterateFields(f []interface{}) []arrow.Field { + var s []arrow.Field + for _, field := range f { + var n schemaNode + n.name = field.(map[string]interface{})["name"].(string) + n.ofType = field.(map[string]interface{})["type"] + switch n.ofType.(type) { + // Getting field type from within field object + case string: + switch n.ofType.(string) { + case "enum": + for _, symbol := range field.(map[string]interface{})["symbols"].([]interface{}) { + n.symbols = append(n.symbols, symbol.(string)) + } + default: + if lt, ok := field.(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + } + // Field type is an object + case map[string]interface{}: + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + case []interface{}: + + default: + if lt, ok := field.(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + } + // Field is of type "record" + if nf, f := field.(map[string]interface{})["fields"]; f { + switch nf.(type) { + // primitive & complex types + case map[string]interface{}: + for _, v := range nf.(map[string]interface{})["fields"].([]interface{}) { + n.fields = append(n.fields, v.(map[string]interface{})) + } + // type unions + default: + for _, v := range nf.([]interface{}) { + n.fields = append(n.fields, v.(map[string]interface{})) + } + } + } + s = append(s, traverseNodes(n)) + } + return s +} + +func traverseNodes(node schemaNode) arrow.Field { + switch node.ofType.(type) { + case string: + // Avro primitive type + if len(node.fields) == 0 { + switch node.ofType.(string) { + case "boolean", "int", "long", "float", "double", "bytes", "string": + if node.logicalType != "" { + return avroLogicalToArrowField(node) + } + // Avro primitive type + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(node.ofType.(string))} + case "fixed": + // Duration type is not supported in github.com/linkedin/goavro + // Implementing as Binary for now. + switch node.logicalType { + case "decimal": + return avroLogicalToArrowField(node) + case "duration": + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} + //return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.MonthDayNanoInterval} + } + return arrow.Field{Name: node.name, Type: &arrow.FixedSizeBinaryType{ByteWidth: node.size}} + case "enum": + symbols := make(map[string]string) + for index, symbol := range node.symbols { + k := strconv.FormatInt(int64(index), 10) + symbols[k] = symbol + } + var dt arrow.DictionaryType = arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: arrow.BinaryTypes.String, Ordered: false} + sl := len(symbols) + switch { + case sl <= math.MaxUint8: + dt.IndexType = arrow.PrimitiveTypes.Uint8 + case sl > math.MaxUint8 && sl <= math.MaxUint16: + dt.IndexType = arrow.PrimitiveTypes.Uint16 + case sl > math.MaxUint16 && sl <= math.MaxUint32: + dt.IndexType = arrow.PrimitiveTypes.Uint32 + } + return arrow.Field{Name: node.name, Type: &dt, Nullable: true, Metadata: arrow.MetadataFrom(symbols)} + default: + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(node.ofType.(string))} + } + } else { + // avro "record" type, node has "fields" array + if node.ofType.(string) == "record" { + var n schemaNode + n.name = node.name + n.ofType = node.ofType + if len(node.fields) > 0 { + n.fields = append(n.fields, node.fields...) + } + f := iterateFields(n.fields) + return arrow.Field{Name: node.name, Type: arrow.StructOf(f...)} + } + } + // Avro complex types + case map[string]interface{}: + //var n schemaNode + //n.name = node.name + //n.ofType = node.ofType.(map[string]interface{})["type"] + switch node.logicalType { + case "": + return avroComplexToArrowField(node) + default: + return avroLogicalToArrowField(node) + } + + // Avro union types + case []interface{}: + var unionTypes []string + for _, ft := range node.ofType.([]interface{}) { + switch ft.(type) { + // primitive types + case string: + if ft != "null" { + unionTypes = append(unionTypes, ft.(string)) + } + continue + // complex types + case map[string]interface{}: + var n schemaNode + n.name = node.name + n.ofType = ft.(map[string]interface{})["type"] + if _, f := ft.(map[string]interface{})["fields"]; f { + for _, field := range ft.(map[string]interface{})["fields"].([]interface{}) { + n.fields = append(n.fields, field.(map[string]interface{})) + } + } + f := iterateFields(n.fields) + return arrow.Field{Name: node.name, Type: arrow.StructOf(f...)} + } + } + // Supported Avro union type is null + one other type. + // TODO: Complex AVRO union to Arrow Dense || Sparse Union. + if len(unionTypes) == 1 { + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(unionTypes[0])} + } else { + // BYTE_ARRAY is the catchall if union type is anything beyond null + one other type. + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} + } + } + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} +} + +func avroLogicalToArrowField(node schemaNode) arrow.Field { + // Avro logical types + switch node.logicalType { + // The decimal logical type represents an arbitrary-precision signed decimal number of the form unscaled × 10-scale. + // A decimal logical type annotates Avro bytes or fixed types. The byte array must contain the two’s-complement + // representation of the unscaled integer value in big-endian byte order. The scale is fixed, and is specified + // using an attribute. + // + // The following attributes are supported: + // scale, a JSON integer representing the scale (optional). If not specified the scale is 0. + // precision, a JSON integer representing the (maximum) precision of decimals stored in this type (required). + case "decimal": + if node.precision <= 38 { + return arrow.Field{Name: node.name, Type: &arrow.Decimal128Type{Precision: node.precision, Scale: node.scale}} + } else { + return arrow.Field{Name: node.name, Type: &arrow.Decimal256Type{Precision: node.precision, Scale: node.scale}} + } + + // The uuid logical type represents a random generated universally unique identifier (UUID). + // A uuid logical type annotates an Avro string. The string has to conform with RFC-4122 + case "uuid": + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.String} + + // The date logical type represents a date within the calendar, with no reference to a particular + // time zone or time of day. + // A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, + // 1 January 1970 (ISO calendar). + case "date": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Date32} + + // The time-millis logical type represents a time of day, with no reference to a particular calendar, + // time zone or date, with a precision of one millisecond. + // A time-millis logical type annotates an Avro int, where the int stores the number of milliseconds + // after midnight, 00:00:00.000. + case "time-millis": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Time32ms} + + // The time-micros logical type represents a time of day, with no reference to a particular calendar, + // time zone or date, with a precision of one microsecond. + // A time-micros logical type annotates an Avro long, where the long stores the number of microseconds + // after midnight, 00:00:00.000000. + case "time-micros": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Time64us} + + // The timestamp-millis logical type represents an instant on the global timeline, independent of a + // particular time zone or calendar, with a precision of one millisecond. Please note that time zone + // information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, + // but not the original representation. In practice, such timestamps are typically displayed to users in + // their local time zones, therefore they may be displayed differently depending on the execution environment. + // A timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds + // from the unix epoch, 1 January 1970 00:00:00.000 UTC. + case "timestamp-millis": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Timestamp_ms} + // The timestamp-micros logical type represents an instant on the global timeline, independent of a + // particular time zone or calendar, with a precision of one microsecond. Please note that time zone + // information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, + // but not the original representation. In practice, such timestamps are typically displayed to users + // in their local time zones, therefore they may be displayed differently depending on the execution environment. + // A timestamp-micros logical type annotates an Avro long, where the long stores the number of microseconds + // from the unix epoch, 1 January 1970 00:00:00.000000 UTC. + case "timestamp-micros": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Timestamp_us} + // The local-timestamp-millis logical type represents a timestamp in a local timezone, regardless of + // what specific time zone is considered local, with a precision of one millisecond. + // A local-timestamp-millis logical type annotates an Avro long, where the long stores the number of + // milliseconds, from 1 January 1970 00:00:00.000. + case "local-timestamp-millis": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Timestamp_ms} + // The local-timestamp-micros logical type represents a timestamp in a local timezone, regardless of + // what specific time zone is considered local, with a precision of one microsecond. + // A local-timestamp-micros logical type annotates an Avro long, where the long stores the number of + // microseconds, from 1 January 1970 00:00:00.000000. + case "local-timestamp-micros": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Timestamp_us} + // Avro primitive type + default: + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(node.ofType.(string))} + } + return arrow.Field{} Review Comment: shouldn't this be an error? ########## go/arrow/avro/schema.go: ########## @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package avro reads Avro OCF files and presents the extracted data as records +package avro + +import ( + "encoding/json" + "fmt" + "math" + "strconv" + + "github.com/apache/arrow/go/v13/arrow" +) + +type schemaNode struct { + name string + ofType interface{} + logicalType string + precision int32 + scale int32 + size int + symbols []string + fields []interface{} +} + +// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema JSON. +// If the top level is of record type, set includeTopLevel to either make +// its fields top level fields in the resulting schema or nested in a single field. +func ArrowSchemaFromAvro(avroSchema []byte) (*arrow.Schema, error) { + var m map[string]interface{} + var node schemaNode + json.Unmarshal(avroSchema, &m) + if m["type"].(string) == "record" { + if _, ok := m["fields"]; ok { + for _, field := range m["fields"].([]interface{}) { + node.fields = append(node.fields, field.(map[string]interface{})) + } + if len(node.fields) == 0 { + return nil, fmt.Errorf("invalid avro schema: no top level record fields found") + } + } + } + fields := iterateFields(node.fields) + return arrow.NewSchema(fields, nil), nil +} + +func iterateFields(f []interface{}) []arrow.Field { + var s []arrow.Field + for _, field := range f { + var n schemaNode + n.name = field.(map[string]interface{})["name"].(string) + n.ofType = field.(map[string]interface{})["type"] + switch n.ofType.(type) { + // Getting field type from within field object + case string: + switch n.ofType.(string) { + case "enum": + for _, symbol := range field.(map[string]interface{})["symbols"].([]interface{}) { + n.symbols = append(n.symbols, symbol.(string)) + } + default: + if lt, ok := field.(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + } + // Field type is an object + case map[string]interface{}: + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + case []interface{}: + + default: + if lt, ok := field.(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + } + // Field is of type "record" + if nf, f := field.(map[string]interface{})["fields"]; f { + switch nf.(type) { + // primitive & complex types + case map[string]interface{}: + for _, v := range nf.(map[string]interface{})["fields"].([]interface{}) { + n.fields = append(n.fields, v.(map[string]interface{})) + } + // type unions + default: + for _, v := range nf.([]interface{}) { + n.fields = append(n.fields, v.(map[string]interface{})) + } + } + } + s = append(s, traverseNodes(n)) + } + return s +} + +func traverseNodes(node schemaNode) arrow.Field { + switch node.ofType.(type) { + case string: + // Avro primitive type + if len(node.fields) == 0 { + switch node.ofType.(string) { + case "boolean", "int", "long", "float", "double", "bytes", "string": + if node.logicalType != "" { + return avroLogicalToArrowField(node) + } + // Avro primitive type + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(node.ofType.(string))} + case "fixed": + // Duration type is not supported in github.com/linkedin/goavro + // Implementing as Binary for now. + switch node.logicalType { + case "decimal": + return avroLogicalToArrowField(node) + case "duration": + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} + //return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.MonthDayNanoInterval} + } + return arrow.Field{Name: node.name, Type: &arrow.FixedSizeBinaryType{ByteWidth: node.size}} + case "enum": + symbols := make(map[string]string) + for index, symbol := range node.symbols { + k := strconv.FormatInt(int64(index), 10) + symbols[k] = symbol + } + var dt arrow.DictionaryType = arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: arrow.BinaryTypes.String, Ordered: false} + sl := len(symbols) + switch { + case sl <= math.MaxUint8: + dt.IndexType = arrow.PrimitiveTypes.Uint8 + case sl > math.MaxUint8 && sl <= math.MaxUint16: + dt.IndexType = arrow.PrimitiveTypes.Uint16 + case sl > math.MaxUint16 && sl <= math.MaxUint32: + dt.IndexType = arrow.PrimitiveTypes.Uint32 + } + return arrow.Field{Name: node.name, Type: &dt, Nullable: true, Metadata: arrow.MetadataFrom(symbols)} + default: + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(node.ofType.(string))} + } + } else { + // avro "record" type, node has "fields" array + if node.ofType.(string) == "record" { + var n schemaNode + n.name = node.name + n.ofType = node.ofType + if len(node.fields) > 0 { + n.fields = append(n.fields, node.fields...) + } + f := iterateFields(n.fields) + return arrow.Field{Name: node.name, Type: arrow.StructOf(f...)} + } + } + // Avro complex types + case map[string]interface{}: + //var n schemaNode + //n.name = node.name + //n.ofType = node.ofType.(map[string]interface{})["type"] + switch node.logicalType { + case "": + return avroComplexToArrowField(node) + default: + return avroLogicalToArrowField(node) + } + + // Avro union types + case []interface{}: + var unionTypes []string + for _, ft := range node.ofType.([]interface{}) { + switch ft.(type) { + // primitive types + case string: + if ft != "null" { + unionTypes = append(unionTypes, ft.(string)) + } + continue + // complex types + case map[string]interface{}: + var n schemaNode + n.name = node.name + n.ofType = ft.(map[string]interface{})["type"] + if _, f := ft.(map[string]interface{})["fields"]; f { + for _, field := range ft.(map[string]interface{})["fields"].([]interface{}) { + n.fields = append(n.fields, field.(map[string]interface{})) + } + } + f := iterateFields(n.fields) + return arrow.Field{Name: node.name, Type: arrow.StructOf(f...)} + } + } + // Supported Avro union type is null + one other type. + // TODO: Complex AVRO union to Arrow Dense || Sparse Union. + if len(unionTypes) == 1 { + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(unionTypes[0])} + } else { + // BYTE_ARRAY is the catchall if union type is anything beyond null + one other type. + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} + } + } + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} +} + +func avroLogicalToArrowField(node schemaNode) arrow.Field { + // Avro logical types + switch node.logicalType { + // The decimal logical type represents an arbitrary-precision signed decimal number of the form unscaled × 10-scale. + // A decimal logical type annotates Avro bytes or fixed types. The byte array must contain the two’s-complement + // representation of the unscaled integer value in big-endian byte order. The scale is fixed, and is specified + // using an attribute. + // + // The following attributes are supported: + // scale, a JSON integer representing the scale (optional). If not specified the scale is 0. + // precision, a JSON integer representing the (maximum) precision of decimals stored in this type (required). + case "decimal": + if node.precision <= 38 { + return arrow.Field{Name: node.name, Type: &arrow.Decimal128Type{Precision: node.precision, Scale: node.scale}} + } else { + return arrow.Field{Name: node.name, Type: &arrow.Decimal256Type{Precision: node.precision, Scale: node.scale}} + } + + // The uuid logical type represents a random generated universally unique identifier (UUID). + // A uuid logical type annotates an Avro string. The string has to conform with RFC-4122 + case "uuid": + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.String} + + // The date logical type represents a date within the calendar, with no reference to a particular + // time zone or time of day. + // A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, + // 1 January 1970 (ISO calendar). + case "date": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Date32} + + // The time-millis logical type represents a time of day, with no reference to a particular calendar, + // time zone or date, with a precision of one millisecond. + // A time-millis logical type annotates an Avro int, where the int stores the number of milliseconds + // after midnight, 00:00:00.000. + case "time-millis": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Time32ms} + + // The time-micros logical type represents a time of day, with no reference to a particular calendar, + // time zone or date, with a precision of one microsecond. + // A time-micros logical type annotates an Avro long, where the long stores the number of microseconds + // after midnight, 00:00:00.000000. + case "time-micros": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Time64us} + + // The timestamp-millis logical type represents an instant on the global timeline, independent of a + // particular time zone or calendar, with a precision of one millisecond. Please note that time zone + // information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, + // but not the original representation. In practice, such timestamps are typically displayed to users in + // their local time zones, therefore they may be displayed differently depending on the execution environment. + // A timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds + // from the unix epoch, 1 January 1970 00:00:00.000 UTC. + case "timestamp-millis": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Timestamp_ms} + // The timestamp-micros logical type represents an instant on the global timeline, independent of a + // particular time zone or calendar, with a precision of one microsecond. Please note that time zone + // information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, + // but not the original representation. In practice, such timestamps are typically displayed to users + // in their local time zones, therefore they may be displayed differently depending on the execution environment. + // A timestamp-micros logical type annotates an Avro long, where the long stores the number of microseconds + // from the unix epoch, 1 January 1970 00:00:00.000000 UTC. + case "timestamp-micros": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Timestamp_us} + // The local-timestamp-millis logical type represents a timestamp in a local timezone, regardless of + // what specific time zone is considered local, with a precision of one millisecond. + // A local-timestamp-millis logical type annotates an Avro long, where the long stores the number of + // milliseconds, from 1 January 1970 00:00:00.000. + case "local-timestamp-millis": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Timestamp_ms} + // The local-timestamp-micros logical type represents a timestamp in a local timezone, regardless of + // what specific time zone is considered local, with a precision of one microsecond. + // A local-timestamp-micros logical type annotates an Avro long, where the long stores the number of + // microseconds, from 1 January 1970 00:00:00.000000. + case "local-timestamp-micros": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Timestamp_us} + // Avro primitive type + default: + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(node.ofType.(string))} + } + return arrow.Field{} +} + +func avroComplexToArrowField(node schemaNode) arrow.Field { + var n schemaNode + n.name = node.name + n.ofType = node.ofType.(map[string]interface{})["type"] + // Avro "array" field type + if i, ok := node.ofType.(map[string]interface{})["items"]; ok { + switch i.(string) { + case "int", "long", "float", "double", "bytes", "boolean", "string": + return arrow.Field{Name: node.name, Type: arrow.ListOf(AvroPrimitiveToArrowType(i.(string)))} + case "enum", "fixed", "map", "record", "array": + return arrow.Field{Name: node.name, Type: arrow.ListOf(avroComplexToArrowField(n).Type)} + case "decimal", "uuid", "date", "time-millis", "time-micros", "timestamp-millis", "timestamp-micros", "local-timestamp-millis", "local-timestamp-micros": + return arrow.Field{Name: node.name, Type: arrow.ListOf(avroLogicalToArrowField(n).Type)} + } + } + // Avro "enum" field type = Arrow dictionary type + if i, ok := node.ofType.(map[string]interface{})["symbols"]; ok { + symbols := make(map[string]string) + for index, symbol := range i.([]interface{}) { + k := strconv.FormatInt(int64(index), 10) + symbols[k] = symbol.(string) + } + var dt arrow.DictionaryType = arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: arrow.BinaryTypes.String, Ordered: false} + sl := len(symbols) + switch { + case sl <= math.MaxUint8: + dt.IndexType = arrow.PrimitiveTypes.Uint8 + case sl > math.MaxUint8 && sl <= math.MaxUint16: + dt.IndexType = arrow.PrimitiveTypes.Uint16 + case sl > math.MaxUint16 && sl <= math.MaxUint32: + dt.IndexType = arrow.PrimitiveTypes.Uint32 + } + return arrow.Field{Name: node.name, Type: &dt, Nullable: true, Metadata: arrow.MetadataFrom(symbols)} + } + // Avro "fixed" field type = Arrow FixedSize Primitive BinaryType + if i, ok := node.ofType.(map[string]interface{})["size"]; ok { + return arrow.Field{Name: node.name, Type: &arrow.FixedSizeBinaryType{ByteWidth: int(i.(float64))}} + } + // Avro "map" field type + if i, ok := node.ofType.(map[string]interface{})["values"]; ok { + return arrow.Field{Name: node.name, Type: arrow.MapOf(arrow.BinaryTypes.String, AvroPrimitiveToArrowType(i.(string)))} + } + // Avro "record" field type + if _, f := node.ofType.(map[string]interface{})["fields"]; f { + for _, field := range node.ofType.(map[string]interface{})["fields"].([]interface{}) { + n.fields = append(n.fields, field.(map[string]interface{})) + } + s := iterateFields(n.fields) + return arrow.Field{Name: n.name, Type: arrow.StructOf(s...)} + } + return arrow.Field{} +} + +// AvroPrimitiveToArrowType returns the Arrow DataType equivalent to a +// Avro primitive type. +// +// NOTE: Arrow Binary type is used as a catchall to avoid potential data loss. +func AvroPrimitiveToArrowType(avroFieldType string) arrow.DataType { + switch avroFieldType { + // int: 32-bit signed integer + case "int": + return arrow.PrimitiveTypes.Int32 + // long: 64-bit signed integer + case "long": + return arrow.PrimitiveTypes.Int64 + // float: single precision (32-bit) IEEE 754 floating-point number + case "float": + return arrow.PrimitiveTypes.Float32 + // double: double precision (64-bit) IEEE 754 floating-point number + case "double": + return arrow.PrimitiveTypes.Float64 + // bytes: sequence of 8-bit unsigned bytes + case "bytes": + return arrow.BinaryTypes.Binary + // boolean: a binary value + case "boolean": + return arrow.FixedWidthTypes.Boolean + // string: unicode character sequence + case "string": + return arrow.BinaryTypes.String + // fallback to binary type for any unsupported type + default: + return arrow.BinaryTypes.Binary Review Comment: I'd prefer an error with `arrow.ErrNotImplemented` for any unsupported type ########## go/go.mod: ########## @@ -59,6 +59,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kr/text v0.2.0 // indirect + github.com/linkedin/goavro/v2 v2.12.0 // indirect Review Comment: Might I suggest `github.com/hamba/avro/v2/` as it seems to be about an order of magnitude faster than goavro....? ########## go/arrow/avro/common.go: ########## @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "errors" + "fmt" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/memory" +) + +var ( + ErrMismatchFields = errors.New("arrow/avro: number of records mismatch") +) + +// Option configures an Avro reader/writer. +type Option func(config) +type config interface{} + +// WithTopLevel specifies whether to include an Avro schema's top level record +// or only its fields. +func WithTopLevel(b bool) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.topLevel = b + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithAllocator specifies the Arrow memory allocator used while building records. +func WithAllocator(mem memory.Allocator) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.mem = mem + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithChunk specifies the chunk size used while reading Avro OCF files. +// +// If n is zero or 1, no chunking will take place and the reader will create +// one record per row. +// If n is greater than 1, chunks of n rows will be read. +// If n is negative, the reader will load the whole Avro OCF file into memory and +// create one big record with all the rows. +func WithChunk(n int) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.chunk = n + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// DefaultNullValues is the set of values considered as NULL values by default +// when Reader is configured to handle NULL values. +var DefaultNullValues = []string{"", "NULL", "null"} + +// WithNullReader sets options for a CSV Reader pertaining to NULL value +// handling. If stringsCanBeNull is true, then a string that matches one of the +// nullValues set will be interpreted as NULL. Numeric columns will be checked +// for nulls in all cases. If no nullValues arguments are passed in, the +// defaults set in NewReader() will be kept. +// +// When no NULL values is given, the default set is taken from DefaultNullValues. +func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.stringsCanBeNull = stringsCanBeNull + + if len(nullValues) == 0 { + nullValues = DefaultNullValues + } + cfg.nulls = make([]string, len(nullValues)) + copy(cfg.nulls, nullValues) + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithColumnTypes allows specifying optional per-column types (disabling +// type inference on those columns). +// +// Will panic if used in conjunction with an explicit schema. +func WithColumnTypes(types map[string]arrow.DataType) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + if cfg.schema != nil { + panic(fmt.Errorf("%w: cannot use WithColumnTypes with explicit schema", arrow.ErrInvalid)) + } + cfg.columnTypes = types + default: + panic(fmt.Errorf("%w: WithColumnTypes only allowed for csv reader", arrow.ErrInvalid)) + } + } +} + +// WithIncludeColumns indicates the names of the columns from the AVRO file +// that should actually be read and converted (in the slice's order). +// If set and non-empty, columns not in this slice will be ignored. +// +// Will panic if used in conjunction with an explicit schema. +func WithIncludeColumns(cols []string) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + if cfg.schema != nil { + panic(fmt.Errorf("%w: cannot use WithIncludeColumns with explicit schema", arrow.ErrInvalid)) + } + cfg.columnFilter = cols + default: + panic(fmt.Errorf("%w: WithIncludeColumns only allowed on csv Reader", arrow.ErrInvalid)) + } + } +} Review Comment: I believe that Avro isn't order based, it's name and field based which means that you can probably get away with reading this into a `map[string]struct{}` to create a "set" of fields. ########## go/arrow/avro/common.go: ########## @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "errors" + "fmt" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/memory" +) + +var ( + ErrMismatchFields = errors.New("arrow/avro: number of records mismatch") +) + +// Option configures an Avro reader/writer. +type Option func(config) +type config interface{} + +// WithTopLevel specifies whether to include an Avro schema's top level record +// or only its fields. +func WithTopLevel(b bool) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.topLevel = b + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithAllocator specifies the Arrow memory allocator used while building records. +func WithAllocator(mem memory.Allocator) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.mem = mem + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithChunk specifies the chunk size used while reading Avro OCF files. +// +// If n is zero or 1, no chunking will take place and the reader will create +// one record per row. +// If n is greater than 1, chunks of n rows will be read. +// If n is negative, the reader will load the whole Avro OCF file into memory and +// create one big record with all the rows. +func WithChunk(n int) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.chunk = n + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// DefaultNullValues is the set of values considered as NULL values by default +// when Reader is configured to handle NULL values. +var DefaultNullValues = []string{"", "NULL", "null"} + +// WithNullReader sets options for a CSV Reader pertaining to NULL value +// handling. If stringsCanBeNull is true, then a string that matches one of the +// nullValues set will be interpreted as NULL. Numeric columns will be checked +// for nulls in all cases. If no nullValues arguments are passed in, the +// defaults set in NewReader() will be kept. +// +// When no NULL values is given, the default set is taken from DefaultNullValues. +func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.stringsCanBeNull = stringsCanBeNull + + if len(nullValues) == 0 { + nullValues = DefaultNullValues + } + cfg.nulls = make([]string, len(nullValues)) + copy(cfg.nulls, nullValues) + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithColumnTypes allows specifying optional per-column types (disabling +// type inference on those columns). +// +// Will panic if used in conjunction with an explicit schema. +func WithColumnTypes(types map[string]arrow.DataType) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + if cfg.schema != nil { + panic(fmt.Errorf("%w: cannot use WithColumnTypes with explicit schema", arrow.ErrInvalid)) + } + cfg.columnTypes = types + default: + panic(fmt.Errorf("%w: WithColumnTypes only allowed for csv reader", arrow.ErrInvalid)) + } + } +} Review Comment: We don't need type inference since Avro has a specific typed schema, so this is unnecessary entirely. We should just define what the mapping between Avro types and Arrow types are. ########## go/arrow/avro/schema.go: ########## @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package avro reads Avro OCF files and presents the extracted data as records +package avro + +import ( + "encoding/json" + "fmt" + "math" + "strconv" + + "github.com/apache/arrow/go/v13/arrow" +) + +type schemaNode struct { + name string + ofType interface{} + logicalType string + precision int32 + scale int32 + size int + symbols []string + fields []interface{} +} + +// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema JSON. +// If the top level is of record type, set includeTopLevel to either make +// its fields top level fields in the resulting schema or nested in a single field. +func ArrowSchemaFromAvro(avroSchema []byte) (*arrow.Schema, error) { + var m map[string]interface{} + var node schemaNode + json.Unmarshal(avroSchema, &m) + if m["type"].(string) == "record" { + if _, ok := m["fields"]; ok { + for _, field := range m["fields"].([]interface{}) { + node.fields = append(node.fields, field.(map[string]interface{})) + } + if len(node.fields) == 0 { + return nil, fmt.Errorf("invalid avro schema: no top level record fields found") + } + } + } + fields := iterateFields(node.fields) + return arrow.NewSchema(fields, nil), nil +} + +func iterateFields(f []interface{}) []arrow.Field { + var s []arrow.Field + for _, field := range f { + var n schemaNode + n.name = field.(map[string]interface{})["name"].(string) + n.ofType = field.(map[string]interface{})["type"] + switch n.ofType.(type) { + // Getting field type from within field object + case string: + switch n.ofType.(string) { + case "enum": + for _, symbol := range field.(map[string]interface{})["symbols"].([]interface{}) { + n.symbols = append(n.symbols, symbol.(string)) + } + default: + if lt, ok := field.(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + } + // Field type is an object + case map[string]interface{}: + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + case []interface{}: + + default: + if lt, ok := field.(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + } + // Field is of type "record" + if nf, f := field.(map[string]interface{})["fields"]; f { + switch nf.(type) { + // primitive & complex types + case map[string]interface{}: + for _, v := range nf.(map[string]interface{})["fields"].([]interface{}) { + n.fields = append(n.fields, v.(map[string]interface{})) + } + // type unions + default: + for _, v := range nf.([]interface{}) { + n.fields = append(n.fields, v.(map[string]interface{})) + } + } + } + s = append(s, traverseNodes(n)) + } + return s +} + +func traverseNodes(node schemaNode) arrow.Field { + switch node.ofType.(type) { + case string: + // Avro primitive type + if len(node.fields) == 0 { + switch node.ofType.(string) { + case "boolean", "int", "long", "float", "double", "bytes", "string": + if node.logicalType != "" { + return avroLogicalToArrowField(node) + } + // Avro primitive type + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(node.ofType.(string))} + case "fixed": + // Duration type is not supported in github.com/linkedin/goavro + // Implementing as Binary for now. + switch node.logicalType { + case "decimal": + return avroLogicalToArrowField(node) + case "duration": + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} + //return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.MonthDayNanoInterval} + } + return arrow.Field{Name: node.name, Type: &arrow.FixedSizeBinaryType{ByteWidth: node.size}} + case "enum": + symbols := make(map[string]string) + for index, symbol := range node.symbols { + k := strconv.FormatInt(int64(index), 10) + symbols[k] = symbol + } + var dt arrow.DictionaryType = arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: arrow.BinaryTypes.String, Ordered: false} + sl := len(symbols) + switch { + case sl <= math.MaxUint8: + dt.IndexType = arrow.PrimitiveTypes.Uint8 + case sl > math.MaxUint8 && sl <= math.MaxUint16: + dt.IndexType = arrow.PrimitiveTypes.Uint16 + case sl > math.MaxUint16 && sl <= math.MaxUint32: + dt.IndexType = arrow.PrimitiveTypes.Uint32 + } + return arrow.Field{Name: node.name, Type: &dt, Nullable: true, Metadata: arrow.MetadataFrom(symbols)} + default: + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(node.ofType.(string))} + } + } else { + // avro "record" type, node has "fields" array + if node.ofType.(string) == "record" { + var n schemaNode + n.name = node.name + n.ofType = node.ofType + if len(node.fields) > 0 { + n.fields = append(n.fields, node.fields...) + } + f := iterateFields(n.fields) + return arrow.Field{Name: node.name, Type: arrow.StructOf(f...)} + } + } + // Avro complex types + case map[string]interface{}: + //var n schemaNode + //n.name = node.name + //n.ofType = node.ofType.(map[string]interface{})["type"] + switch node.logicalType { + case "": + return avroComplexToArrowField(node) + default: + return avroLogicalToArrowField(node) + } + + // Avro union types + case []interface{}: + var unionTypes []string + for _, ft := range node.ofType.([]interface{}) { + switch ft.(type) { + // primitive types + case string: + if ft != "null" { + unionTypes = append(unionTypes, ft.(string)) + } + continue + // complex types + case map[string]interface{}: + var n schemaNode + n.name = node.name + n.ofType = ft.(map[string]interface{})["type"] + if _, f := ft.(map[string]interface{})["fields"]; f { + for _, field := range ft.(map[string]interface{})["fields"].([]interface{}) { + n.fields = append(n.fields, field.(map[string]interface{})) + } + } + f := iterateFields(n.fields) + return arrow.Field{Name: node.name, Type: arrow.StructOf(f...)} + } + } + // Supported Avro union type is null + one other type. + // TODO: Complex AVRO union to Arrow Dense || Sparse Union. + if len(unionTypes) == 1 { + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(unionTypes[0])} + } else { + // BYTE_ARRAY is the catchall if union type is anything beyond null + one other type. + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} + } + } + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} +} + +func avroLogicalToArrowField(node schemaNode) arrow.Field { + // Avro logical types + switch node.logicalType { + // The decimal logical type represents an arbitrary-precision signed decimal number of the form unscaled × 10-scale. + // A decimal logical type annotates Avro bytes or fixed types. The byte array must contain the two’s-complement + // representation of the unscaled integer value in big-endian byte order. The scale is fixed, and is specified + // using an attribute. + // + // The following attributes are supported: + // scale, a JSON integer representing the scale (optional). If not specified the scale is 0. + // precision, a JSON integer representing the (maximum) precision of decimals stored in this type (required). + case "decimal": + if node.precision <= 38 { + return arrow.Field{Name: node.name, Type: &arrow.Decimal128Type{Precision: node.precision, Scale: node.scale}} + } else { + return arrow.Field{Name: node.name, Type: &arrow.Decimal256Type{Precision: node.precision, Scale: node.scale}} + } Review Comment: probably easier to simplify this function and switch to something like: ```go var dt arrow.DataType switch node.logicalType { // ... case "decimal": id := arrow.DECIMAL128 if node.precision > 38 { id = arrow.DECIMAL256 } dt, _ = arrow.NewDecimalType(id, node.precision, node.scale) case "uuid": dt = types.NewUUIDType() // import "github.com/apache/arrow/v13/internal/types" // etc..... } return arrow.Field{Name: node.name, Type: dt} ``` ########## go/arrow/avro/common.go: ########## @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "errors" + "fmt" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/memory" +) + +var ( + ErrMismatchFields = errors.New("arrow/avro: number of records mismatch") +) + +// Option configures an Avro reader/writer. +type Option func(config) +type config interface{} + +// WithTopLevel specifies whether to include an Avro schema's top level record +// or only its fields. +func WithTopLevel(b bool) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.topLevel = b + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithAllocator specifies the Arrow memory allocator used while building records. +func WithAllocator(mem memory.Allocator) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.mem = mem + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithChunk specifies the chunk size used while reading Avro OCF files. +// +// If n is zero or 1, no chunking will take place and the reader will create +// one record per row. +// If n is greater than 1, chunks of n rows will be read. +// If n is negative, the reader will load the whole Avro OCF file into memory and +// create one big record with all the rows. +func WithChunk(n int) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.chunk = n + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// DefaultNullValues is the set of values considered as NULL values by default +// when Reader is configured to handle NULL values. +var DefaultNullValues = []string{"", "NULL", "null"} Review Comment: doesn't Avro have a concept of null already so you don't need to interpret strings as null? ########## go/arrow/avro/common.go: ########## @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "errors" + "fmt" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/memory" +) + +var ( + ErrMismatchFields = errors.New("arrow/avro: number of records mismatch") +) + +// Option configures an Avro reader/writer. +type Option func(config) +type config interface{} + +// WithTopLevel specifies whether to include an Avro schema's top level record +// or only its fields. +func WithTopLevel(b bool) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.topLevel = b + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithAllocator specifies the Arrow memory allocator used while building records. +func WithAllocator(mem memory.Allocator) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.mem = mem + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithChunk specifies the chunk size used while reading Avro OCF files. +// +// If n is zero or 1, no chunking will take place and the reader will create +// one record per row. +// If n is greater than 1, chunks of n rows will be read. +// If n is negative, the reader will load the whole Avro OCF file into memory and +// create one big record with all the rows. +func WithChunk(n int) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.chunk = n + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// DefaultNullValues is the set of values considered as NULL values by default +// when Reader is configured to handle NULL values. +var DefaultNullValues = []string{"", "NULL", "null"} + +// WithNullReader sets options for a CSV Reader pertaining to NULL value +// handling. If stringsCanBeNull is true, then a string that matches one of the +// nullValues set will be interpreted as NULL. Numeric columns will be checked +// for nulls in all cases. If no nullValues arguments are passed in, the +// defaults set in NewReader() will be kept. +// +// When no NULL values is given, the default set is taken from DefaultNullValues. +func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.stringsCanBeNull = stringsCanBeNull + + if len(nullValues) == 0 { + nullValues = DefaultNullValues + } + cfg.nulls = make([]string, len(nullValues)) + copy(cfg.nulls, nullValues) + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithColumnTypes allows specifying optional per-column types (disabling +// type inference on those columns). +// +// Will panic if used in conjunction with an explicit schema. +func WithColumnTypes(types map[string]arrow.DataType) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + if cfg.schema != nil { + panic(fmt.Errorf("%w: cannot use WithColumnTypes with explicit schema", arrow.ErrInvalid)) + } + cfg.columnTypes = types + default: + panic(fmt.Errorf("%w: WithColumnTypes only allowed for csv reader", arrow.ErrInvalid)) + } + } +} + +// WithIncludeColumns indicates the names of the columns from the AVRO file +// that should actually be read and converted (in the slice's order). +// If set and non-empty, columns not in this slice will be ignored. +// +// Will panic if used in conjunction with an explicit schema. +func WithIncludeColumns(cols []string) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + if cfg.schema != nil { + panic(fmt.Errorf("%w: cannot use WithIncludeColumns with explicit schema", arrow.ErrInvalid)) + } Review Comment: why do we need the explicit schema to do this? we have the avro schema so we can throw an error if the field is not found etc. ########## go/arrow/avro/schema.go: ########## @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package avro reads Avro OCF files and presents the extracted data as records +package avro + +import ( + "encoding/json" + "fmt" + "math" + "strconv" + + "github.com/apache/arrow/go/v13/arrow" +) + +type schemaNode struct { + name string + ofType interface{} + logicalType string + precision int32 + scale int32 + size int + symbols []string + fields []interface{} +} + +// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema JSON. +// If the top level is of record type, set includeTopLevel to either make +// its fields top level fields in the resulting schema or nested in a single field. +func ArrowSchemaFromAvro(avroSchema []byte) (*arrow.Schema, error) { + var m map[string]interface{} + var node schemaNode + json.Unmarshal(avroSchema, &m) + if m["type"].(string) == "record" { + if _, ok := m["fields"]; ok { + for _, field := range m["fields"].([]interface{}) { + node.fields = append(node.fields, field.(map[string]interface{})) + } + if len(node.fields) == 0 { + return nil, fmt.Errorf("invalid avro schema: no top level record fields found") + } + } + } + fields := iterateFields(node.fields) + return arrow.NewSchema(fields, nil), nil +} + +func iterateFields(f []interface{}) []arrow.Field { + var s []arrow.Field + for _, field := range f { + var n schemaNode + n.name = field.(map[string]interface{})["name"].(string) + n.ofType = field.(map[string]interface{})["type"] + switch n.ofType.(type) { + // Getting field type from within field object + case string: + switch n.ofType.(string) { + case "enum": + for _, symbol := range field.(map[string]interface{})["symbols"].([]interface{}) { + n.symbols = append(n.symbols, symbol.(string)) + } + default: + if lt, ok := field.(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + } + // Field type is an object + case map[string]interface{}: + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["type"].(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + case []interface{}: + + default: + if lt, ok := field.(map[string]interface{})["logicalType"]; ok { + n.logicalType = lt.(string) + } + if lt, ok := field.(map[string]interface{})["size"]; ok { + n.size = int(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["precision"]; ok { + n.precision = int32(lt.(float64)) + } + if lt, ok := field.(map[string]interface{})["scale"]; ok { + n.scale = int32(lt.(float64)) + } + } + // Field is of type "record" + if nf, f := field.(map[string]interface{})["fields"]; f { + switch nf.(type) { + // primitive & complex types + case map[string]interface{}: + for _, v := range nf.(map[string]interface{})["fields"].([]interface{}) { + n.fields = append(n.fields, v.(map[string]interface{})) + } + // type unions + default: + for _, v := range nf.([]interface{}) { + n.fields = append(n.fields, v.(map[string]interface{})) + } + } + } + s = append(s, traverseNodes(n)) + } + return s +} + +func traverseNodes(node schemaNode) arrow.Field { + switch node.ofType.(type) { + case string: + // Avro primitive type + if len(node.fields) == 0 { + switch node.ofType.(string) { + case "boolean", "int", "long", "float", "double", "bytes", "string": + if node.logicalType != "" { + return avroLogicalToArrowField(node) + } + // Avro primitive type + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(node.ofType.(string))} + case "fixed": + // Duration type is not supported in github.com/linkedin/goavro + // Implementing as Binary for now. + switch node.logicalType { + case "decimal": + return avroLogicalToArrowField(node) + case "duration": + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} + //return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.MonthDayNanoInterval} + } + return arrow.Field{Name: node.name, Type: &arrow.FixedSizeBinaryType{ByteWidth: node.size}} + case "enum": + symbols := make(map[string]string) + for index, symbol := range node.symbols { + k := strconv.FormatInt(int64(index), 10) + symbols[k] = symbol + } + var dt arrow.DictionaryType = arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: arrow.BinaryTypes.String, Ordered: false} + sl := len(symbols) + switch { + case sl <= math.MaxUint8: + dt.IndexType = arrow.PrimitiveTypes.Uint8 + case sl > math.MaxUint8 && sl <= math.MaxUint16: + dt.IndexType = arrow.PrimitiveTypes.Uint16 + case sl > math.MaxUint16 && sl <= math.MaxUint32: + dt.IndexType = arrow.PrimitiveTypes.Uint32 + } + return arrow.Field{Name: node.name, Type: &dt, Nullable: true, Metadata: arrow.MetadataFrom(symbols)} + default: + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(node.ofType.(string))} + } + } else { + // avro "record" type, node has "fields" array + if node.ofType.(string) == "record" { + var n schemaNode + n.name = node.name + n.ofType = node.ofType + if len(node.fields) > 0 { + n.fields = append(n.fields, node.fields...) + } + f := iterateFields(n.fields) + return arrow.Field{Name: node.name, Type: arrow.StructOf(f...)} + } + } + // Avro complex types + case map[string]interface{}: + //var n schemaNode + //n.name = node.name + //n.ofType = node.ofType.(map[string]interface{})["type"] + switch node.logicalType { + case "": + return avroComplexToArrowField(node) + default: + return avroLogicalToArrowField(node) + } + + // Avro union types + case []interface{}: + var unionTypes []string + for _, ft := range node.ofType.([]interface{}) { + switch ft.(type) { + // primitive types + case string: + if ft != "null" { + unionTypes = append(unionTypes, ft.(string)) + } + continue + // complex types + case map[string]interface{}: + var n schemaNode + n.name = node.name + n.ofType = ft.(map[string]interface{})["type"] + if _, f := ft.(map[string]interface{})["fields"]; f { + for _, field := range ft.(map[string]interface{})["fields"].([]interface{}) { + n.fields = append(n.fields, field.(map[string]interface{})) + } + } + f := iterateFields(n.fields) + return arrow.Field{Name: node.name, Type: arrow.StructOf(f...)} + } + } + // Supported Avro union type is null + one other type. + // TODO: Complex AVRO union to Arrow Dense || Sparse Union. + if len(unionTypes) == 1 { + return arrow.Field{Name: node.name, Type: AvroPrimitiveToArrowType(unionTypes[0])} + } else { + // BYTE_ARRAY is the catchall if union type is anything beyond null + one other type. + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} + } + } + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.Binary} +} + +func avroLogicalToArrowField(node schemaNode) arrow.Field { + // Avro logical types + switch node.logicalType { + // The decimal logical type represents an arbitrary-precision signed decimal number of the form unscaled × 10-scale. + // A decimal logical type annotates Avro bytes or fixed types. The byte array must contain the two’s-complement + // representation of the unscaled integer value in big-endian byte order. The scale is fixed, and is specified + // using an attribute. + // + // The following attributes are supported: + // scale, a JSON integer representing the scale (optional). If not specified the scale is 0. + // precision, a JSON integer representing the (maximum) precision of decimals stored in this type (required). + case "decimal": + if node.precision <= 38 { + return arrow.Field{Name: node.name, Type: &arrow.Decimal128Type{Precision: node.precision, Scale: node.scale}} + } else { + return arrow.Field{Name: node.name, Type: &arrow.Decimal256Type{Precision: node.precision, Scale: node.scale}} + } + + // The uuid logical type represents a random generated universally unique identifier (UUID). + // A uuid logical type annotates an Avro string. The string has to conform with RFC-4122 + case "uuid": + return arrow.Field{Name: node.name, Type: arrow.BinaryTypes.String} + + // The date logical type represents a date within the calendar, with no reference to a particular + // time zone or time of day. + // A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, + // 1 January 1970 (ISO calendar). + case "date": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Date32} + + // The time-millis logical type represents a time of day, with no reference to a particular calendar, + // time zone or date, with a precision of one millisecond. + // A time-millis logical type annotates an Avro int, where the int stores the number of milliseconds + // after midnight, 00:00:00.000. + case "time-millis": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Time32ms} + + // The time-micros logical type represents a time of day, with no reference to a particular calendar, + // time zone or date, with a precision of one microsecond. + // A time-micros logical type annotates an Avro long, where the long stores the number of microseconds + // after midnight, 00:00:00.000000. + case "time-micros": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Time64us} + + // The timestamp-millis logical type represents an instant on the global timeline, independent of a + // particular time zone or calendar, with a precision of one millisecond. Please note that time zone + // information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, + // but not the original representation. In practice, such timestamps are typically displayed to users in + // their local time zones, therefore they may be displayed differently depending on the execution environment. + // A timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds + // from the unix epoch, 1 January 1970 00:00:00.000 UTC. + case "timestamp-millis": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Timestamp_ms} + // The timestamp-micros logical type represents an instant on the global timeline, independent of a + // particular time zone or calendar, with a precision of one microsecond. Please note that time zone + // information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, + // but not the original representation. In practice, such timestamps are typically displayed to users + // in their local time zones, therefore they may be displayed differently depending on the execution environment. + // A timestamp-micros logical type annotates an Avro long, where the long stores the number of microseconds + // from the unix epoch, 1 January 1970 00:00:00.000000 UTC. + case "timestamp-micros": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Timestamp_us} + // The local-timestamp-millis logical type represents a timestamp in a local timezone, regardless of + // what specific time zone is considered local, with a precision of one millisecond. + // A local-timestamp-millis logical type annotates an Avro long, where the long stores the number of + // milliseconds, from 1 January 1970 00:00:00.000. + case "local-timestamp-millis": + return arrow.Field{Name: node.name, Type: arrow.FixedWidthTypes.Timestamp_ms} Review Comment: the default `arrow.FixedWidthTypes.Timestamp_ms` sets the timezone to "UTC". You should manually create `&arrow.TimestampType{Unit: arrow.Millisecond}` for the local timestamp (or maybe we should create `NoTZ` versions of these vars that you can use for this....? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
