loicalleyne commented on code in PR #36796: URL: https://github.com/apache/arrow/pull/36796#discussion_r1281132461
########## go/arrow/avro/common.go: ########## @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "errors" + "fmt" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/memory" +) + +var ( + ErrMismatchFields = errors.New("arrow/avro: number of records mismatch") +) + +// Option configures an Avro reader/writer. +type Option func(config) +type config interface{} + +// WithTopLevel specifies whether to include an Avro schema's top level record +// or only its fields. +func WithTopLevel(b bool) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.topLevel = b + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithAllocator specifies the Arrow memory allocator used while building records. +func WithAllocator(mem memory.Allocator) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.mem = mem + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithChunk specifies the chunk size used while reading Avro OCF files. +// +// If n is zero or 1, no chunking will take place and the reader will create +// one record per row. +// If n is greater than 1, chunks of n rows will be read. +// If n is negative, the reader will load the whole Avro OCF file into memory and +// create one big record with all the rows. +func WithChunk(n int) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.chunk = n + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// DefaultNullValues is the set of values considered as NULL values by default +// when Reader is configured to handle NULL values. +var DefaultNullValues = []string{"", "NULL", "null"} + +// WithNullReader sets options for a CSV Reader pertaining to NULL value +// handling. If stringsCanBeNull is true, then a string that matches one of the +// nullValues set will be interpreted as NULL. Numeric columns will be checked +// for nulls in all cases. If no nullValues arguments are passed in, the +// defaults set in NewReader() will be kept. +// +// When no NULL values is given, the default set is taken from DefaultNullValues. +func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + cfg.stringsCanBeNull = stringsCanBeNull + + if len(nullValues) == 0 { + nullValues = DefaultNullValues + } + cfg.nulls = make([]string, len(nullValues)) + copy(cfg.nulls, nullValues) + default: + panic(fmt.Errorf("arrow/avro: unknown config type %T", cfg)) + } + } +} + +// WithColumnTypes allows specifying optional per-column types (disabling +// type inference on those columns). +// +// Will panic if used in conjunction with an explicit schema. +func WithColumnTypes(types map[string]arrow.DataType) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + if cfg.schema != nil { + panic(fmt.Errorf("%w: cannot use WithColumnTypes with explicit schema", arrow.ErrInvalid)) + } + cfg.columnTypes = types + default: + panic(fmt.Errorf("%w: WithColumnTypes only allowed for csv reader", arrow.ErrInvalid)) + } + } +} + +// WithIncludeColumns indicates the names of the columns from the AVRO file +// that should actually be read and converted (in the slice's order). +// If set and non-empty, columns not in this slice will be ignored. +// +// Will panic if used in conjunction with an explicit schema. +func WithIncludeColumns(cols []string) Option { + return func(cfg config) { + switch cfg := cfg.(type) { + case *OCFReader: + if cfg.schema != nil { + panic(fmt.Errorf("%w: cannot use WithIncludeColumns with explicit schema", arrow.ErrInvalid)) + } + cfg.columnFilter = cols + default: + panic(fmt.Errorf("%w: WithIncludeColumns only allowed on csv Reader", arrow.ErrInvalid)) + } + } +} + +func validate(schema *arrow.Schema) { + for i, f := range schema.Fields() { + switch ft := f.Type.(type) { + case *arrow.BooleanType: + case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type: + case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type: + case *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type: + case *arrow.StringType, *arrow.LargeStringType: + case *arrow.TimestampType: + case *arrow.Date32Type, *arrow.Date64Type: + case *arrow.Decimal128Type, *arrow.Decimal256Type: + case *arrow.ListType, *arrow.LargeListType, *arrow.FixedSizeListType: + case *arrow.BinaryType, *arrow.LargeBinaryType, *arrow.FixedSizeBinaryType: + case arrow.ExtensionType: + case *arrow.NullType: + default: + panic(fmt.Errorf("arrow/csv: field %d (%s) has invalid data type %T", i, f.Name, ft)) + } + } +} Review Comment: removed in latest commit, left over from CSV implementation that was used as a skeleton for this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
