This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 5ab60eaea3 GH-36760: [Go] Add Avro OCF reader (#37115)
5ab60eaea3 is described below
commit 5ab60eaea3afaf1ff58e9f70bed481d6e726dd69
Author: loicalleyne <[email protected]>
AuthorDate: Mon Nov 27 11:02:44 2023 -0500
GH-36760: [Go] Add Avro OCF reader (#37115)
### Rationale for this change
### What changes are included in this PR?
Avro reader
### Are these changes tested?
Local integration tests, no unit tests yet.
### Are there any user-facing changes?
New Avro reader API
* Closes: #36760
Lead-authored-by: Loïc Alleyne <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Raúl Cumplido <[email protected]>
Co-authored-by: sgilmore10 <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Alenka Frim <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: James Duong <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Felipe Oliveira Carvalho <[email protected]>
Co-authored-by: Dane Pitkin <[email protected]>
Co-authored-by: Kevin Gurney <[email protected]>
Co-authored-by: Matt Topol <[email protected]>
Co-authored-by: Curt Hagenlocher <[email protected]>
Co-authored-by: Jacob Wujciak-Jens <[email protected]>
Co-authored-by: Hyunseok Seo <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: James Duong <[email protected]>
Co-authored-by: Nic Crane <[email protected]>
Co-authored-by: Ivan Chesnov <[email protected]>
Co-authored-by: Diego Fernández Giraldo <[email protected]>
Co-authored-by: Bryce Mecum <[email protected]>
Co-authored-by: Jonathan Keane <[email protected]>
Co-authored-by: Peter Andreas Entschev <[email protected]>
Co-authored-by: abandy <[email protected]>
Co-authored-by: Lei Hou <[email protected]>
Co-authored-by: Yue <[email protected]>
Co-authored-by: davidhcoe <[email protected]>
Co-authored-by: Tsutomu Katsube <[email protected]>
Co-authored-by: Divyansh200102
<[email protected]>
Co-authored-by: Thomas Newton <[email protected]>
Co-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: prmoore77 <[email protected]>
Co-authored-by: Jiaxing Liang
<[email protected]>
Co-authored-by: orgadish <[email protected]>
Co-authored-by: Maximilian Muecke <[email protected]>
Co-authored-by: Gavin Murrison <[email protected]>
Co-authored-by: William Ayd <[email protected]>
Co-authored-by: Laurent Goujon <[email protected]>
Co-authored-by: Jin Shang <[email protected]>
Co-authored-by: Alexander Grueneberg <[email protected]>
Co-authored-by: Paul Spangler <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: Michael Lui <[email protected]>
Co-authored-by: patrick <[email protected]>
Co-authored-by: Dan Homola <[email protected]>
Co-authored-by: Ben Harkins <[email protected]>
Co-authored-by: Nick Hughes <[email protected]>
Co-authored-by: Fernando Mayer <[email protected]>
Co-authored-by: Rok Mihevc <[email protected]>
Co-authored-by: Francis <[email protected]>
Co-authored-by: Donald Tolley <[email protected]>
Co-authored-by: Judah Rand <[email protected]>
Co-authored-by: Eero Lihavainen <[email protected]>
Co-authored-by: Benjamin Schmidt <[email protected]>
Co-authored-by: Phillip LeBlanc <[email protected]>
Co-authored-by: Pierre Moulon <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Bryan Cutler <[email protected]>
Co-authored-by: Diogo Teles Sant'Anna <[email protected]>
Co-authored-by: Fatemah Panahi <[email protected]>
Co-authored-by: Junming Chen <[email protected]>
Co-authored-by: Chris Larsen <[email protected]>
Co-authored-by: Dan Stone <[email protected]>
Co-authored-by: Tim Schaub <[email protected]>
Co-authored-by: Will Jones <[email protected]>
Co-authored-by: Kevin Gurney <[email protected]>
Co-authored-by: Sarah Gilmore <[email protected]>
Co-authored-by: jeremy <[email protected]>
Co-authored-by: Dan Homola <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: anjakefala <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Co-authored-by: scoder <[email protected]>
Co-authored-by: voidstar69 <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Ivan Chesnov <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: David Li <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/arrow/avro/avro2parquet/main.go | 119 ++++
go/arrow/avro/loader.go | 85 +++
go/arrow/avro/reader.go | 337 +++++++++
go/arrow/avro/reader_test.go | 364 ++++++++++
go/arrow/avro/reader_types.go | 875 ++++++++++++++++++++++++
go/arrow/avro/schema.go | 429 ++++++++++++
go/arrow/avro/schema_test.go | 362 ++++++++++
go/arrow/avro/testdata/arrayrecordmap.avro | Bin 0 -> 582 bytes
go/arrow/avro/testdata/githubsamplecommits.avro | Bin 0 -> 95131 bytes
go/go.mod | 9 +
go/go.sum | 21 +
11 files changed, 2601 insertions(+)
diff --git a/go/arrow/avro/avro2parquet/main.go
b/go/arrow/avro/avro2parquet/main.go
new file mode 100644
index 0000000000..45377b46a4
--- /dev/null
+++ b/go/arrow/avro/avro2parquet/main.go
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "bufio"
+ "bytes"
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "runtime/pprof"
+ "time"
+
+ "github.com/apache/arrow/go/v15/arrow/avro"
+ "github.com/apache/arrow/go/v15/parquet"
+ "github.com/apache/arrow/go/v15/parquet/compress"
+ pq "github.com/apache/arrow/go/v15/parquet/pqarrow"
+)
+
+var (
+ cpuprofile = flag.String("cpuprofile", "", "write cpu profile to
`file`")
+ filepath = flag.String("file", "", "avro ocf to convert")
+)
+
+func main() {
+ flag.Parse()
+ if *cpuprofile != "" {
+ f, err := os.Create(*cpuprofile)
+ if err != nil {
+ log.Fatal("could not create CPU profile: ", err)
+ }
+ defer f.Close() // error handling omitted for example
+ if err := pprof.StartCPUProfile(f); err != nil {
+ log.Fatal("could not start CPU profile: ", err)
+ }
+ defer pprof.StopCPUProfile()
+ }
+ if *filepath == "" {
+ fmt.Println("no file specified")
+ }
+ chunk := 1024 * 8
+ ts := time.Now()
+ log.Println("starting:")
+ info, err := os.Stat(*filepath)
+ if err != nil {
+ fmt.Println(err)
+ os.Exit(1)
+ }
+ filesize := info.Size()
+ data, err := os.ReadFile(*filepath)
+ if err != nil {
+ fmt.Println(err)
+ os.Exit(2)
+ }
+ fmt.Printf("file : %v\nsize: %v MB\n", filepath,
float64(filesize)/1024/1024)
+
+ r := bytes.NewReader(data)
+ ior := bufio.NewReaderSize(r, 4096*8)
+ av2arReader, err := avro.NewOCFReader(ior, avro.WithChunk(chunk))
+ if err != nil {
+ fmt.Println(err)
+ os.Exit(3)
+ }
+ fp, err := os.OpenFile(*filepath+".parquet",
os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
+ if err != nil {
+ fmt.Println(err)
+ os.Exit(4)
+ }
+ defer fp.Close()
+ pwProperties :=
parquet.NewWriterProperties(parquet.WithDictionaryDefault(true),
+ parquet.WithVersion(parquet.V2_LATEST),
+ parquet.WithCompression(compress.Codecs.Snappy),
+ parquet.WithBatchSize(1024*32),
+ parquet.WithDataPageSize(1024*1024),
+ parquet.WithMaxRowGroupLength(64*1024*1024),
+ )
+ awProperties := pq.NewArrowWriterProperties(pq.WithStoreSchema())
+ pr, err := pq.NewFileWriter(av2arReader.Schema(), fp, pwProperties,
awProperties)
+ if err != nil {
+ fmt.Println(err)
+ os.Exit(5)
+ }
+ defer pr.Close()
+ fmt.Printf("parquet version: %v\n", pwProperties.Version())
+ for av2arReader.Next() {
+ if av2arReader.Err() != nil {
+ fmt.Println(err)
+ os.Exit(6)
+ }
+ recs := av2arReader.Record()
+ err = pr.WriteBuffered(recs)
+ if err != nil {
+ fmt.Println(err)
+ os.Exit(7)
+ }
+ recs.Release()
+ }
+ if av2arReader.Err() != nil {
+ fmt.Println(av2arReader.Err())
+ }
+
+ pr.Close()
+ log.Printf("time to convert: %v\n", time.Since(ts))
+}
diff --git a/go/arrow/avro/loader.go b/go/arrow/avro/loader.go
new file mode 100644
index 0000000000..26d8678e8e
--- /dev/null
+++ b/go/arrow/avro/loader.go
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+ "errors"
+ "fmt"
+ "io"
+)
+
+func (r *OCFReader) decodeOCFToChan() {
+ defer close(r.avroChan)
+ for r.r.HasNext() {
+ select {
+ case <-r.readerCtx.Done():
+ r.err = fmt.Errorf("avro decoding cancelled, %d records
read", r.avroDatumCount)
+ return
+ default:
+ var datum any
+ err := r.r.Decode(&datum)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ r.err = nil
+ return
+ }
+ r.err = err
+ return
+ }
+ r.avroChan <- datum
+ r.avroDatumCount++
+ }
+ }
+}
+
+func (r *OCFReader) recordFactory() {
+ defer close(r.recChan)
+ r.primed = true
+ recChunk := 0
+ switch {
+ case r.chunk < 1:
+ for data := range r.avroChan {
+ err := r.ldr.loadDatum(data)
+ if err != nil {
+ r.err = err
+ return
+ }
+ }
+ r.recChan <- r.bld.NewRecord()
+ r.bldDone <- struct{}{}
+ case r.chunk >= 1:
+ for data := range r.avroChan {
+ if recChunk == 0 {
+ r.bld.Reserve(r.chunk)
+ }
+ err := r.ldr.loadDatum(data)
+ if err != nil {
+ r.err = err
+ return
+ }
+ recChunk++
+ if recChunk >= r.chunk {
+ r.recChan <- r.bld.NewRecord()
+ recChunk = 0
+ }
+ }
+ if recChunk != 0 {
+ r.recChan <- r.bld.NewRecord()
+ }
+ r.bldDone <- struct{}{}
+ }
+}
diff --git a/go/arrow/avro/reader.go b/go/arrow/avro/reader.go
new file mode 100644
index 0000000000..e72a5632bd
--- /dev/null
+++ b/go/arrow/avro/reader.go
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "sync/atomic"
+
+ "github.com/apache/arrow/go/v15/arrow"
+ "github.com/apache/arrow/go/v15/arrow/array"
+ "github.com/apache/arrow/go/v15/arrow/internal/debug"
+ "github.com/apache/arrow/go/v15/arrow/memory"
+ "github.com/hamba/avro/v2/ocf"
+ "github.com/tidwall/sjson"
+
+ avro "github.com/hamba/avro/v2"
+)
+
+var ErrMismatchFields = errors.New("arrow/avro: number of records mismatch")
+
+// Option configures an Avro reader/writer.
+type (
+ Option func(config)
+ config *OCFReader
+)
+
+type schemaEdit struct {
+ method string
+ path string
+ value any
+}
+
+// Reader wraps goavro/OCFReader and creates array.Records from a schema.
+type OCFReader struct {
+ r *ocf.Decoder
+ avroSchema string
+ avroSchemaEdits []schemaEdit
+ schema *arrow.Schema
+
+ refs int64
+ bld *array.RecordBuilder
+ bldMap *fieldPos
+ ldr *dataLoader
+ cur arrow.Record
+ err error
+
+ primed bool
+ readerCtx context.Context
+ readCancel func()
+ maxOCF int
+ maxRec int
+
+ avroChan chan any
+ avroDatumCount int64
+ avroChanSize int
+ recChan chan arrow.Record
+
+ bldDone chan struct{}
+
+ recChanSize int
+ chunk int
+ mem memory.Allocator
+}
+
+// NewReader returns a reader that reads from an Avro OCF file and creates
+// arrow.Records from the converted avro data.
+func NewOCFReader(r io.Reader, opts ...Option) (*OCFReader, error) {
+ ocfr, err := ocf.NewDecoder(r)
+ if err != nil {
+ return nil, fmt.Errorf("%w: could not create avro ocfreader",
arrow.ErrInvalid)
+ }
+
+ rr := &OCFReader{
+ r: ocfr,
+ refs: 1,
+ chunk: 1,
+ avroChanSize: 500,
+ recChanSize: 10,
+ }
+ for _, opt := range opts {
+ opt(rr)
+ }
+
+ rr.avroChan = make(chan any, rr.avroChanSize)
+ rr.recChan = make(chan arrow.Record, rr.recChanSize)
+ rr.bldDone = make(chan struct{})
+ schema, err := avro.Parse(string(ocfr.Metadata()["avro.schema"]))
+ if err != nil {
+ return nil, fmt.Errorf("%w: could not parse avro header",
arrow.ErrInvalid)
+ }
+ rr.avroSchema = schema.String()
+ if len(rr.avroSchemaEdits) > 0 {
+ // execute schema edits
+ for _, e := range rr.avroSchemaEdits {
+ err := rr.editAvroSchema(e)
+ if err != nil {
+ return nil, fmt.Errorf("%w: could not edit avro
schema", arrow.ErrInvalid)
+ }
+ }
+ // validate edited schema
+ schema, err = avro.Parse(rr.avroSchema)
+ if err != nil {
+ return nil, fmt.Errorf("%w: could not parse modified
avro schema", arrow.ErrInvalid)
+ }
+ }
+ rr.schema, err = ArrowSchemaFromAvro(schema)
+ if err != nil {
+ return nil, fmt.Errorf("%w: could not convert avro schema",
arrow.ErrInvalid)
+ }
+ if rr.mem == nil {
+ rr.mem = memory.DefaultAllocator
+ }
+ rr.readerCtx, rr.readCancel = context.WithCancel(context.Background())
+ go rr.decodeOCFToChan()
+
+ rr.bld = array.NewRecordBuilder(rr.mem, rr.schema)
+ rr.bldMap = newFieldPos()
+ rr.ldr = newDataLoader()
+ for idx, fb := range rr.bld.Fields() {
+ mapFieldBuilders(fb, rr.schema.Field(idx), rr.bldMap)
+ }
+ rr.ldr.drawTree(rr.bldMap)
+ go rr.recordFactory()
+ return rr, nil
+}
+
+// Reuse allows the OCFReader to be reused to read another Avro file provided
the
+// new Avro file has an identical schema.
+func (rr *OCFReader) Reuse(r io.Reader, opts ...Option) error {
+ rr.Close()
+ rr.err = nil
+ ocfr, err := ocf.NewDecoder(r)
+ if err != nil {
+ return fmt.Errorf("%w: could not create avro ocfreader",
arrow.ErrInvalid)
+ }
+ schema, err := avro.Parse(string(ocfr.Metadata()["avro.schema"]))
+ if err != nil {
+ return fmt.Errorf("%w: could not parse avro header",
arrow.ErrInvalid)
+ }
+ if rr.avroSchema != schema.String() {
+ return fmt.Errorf("%w: avro schema mismatch", arrow.ErrInvalid)
+ }
+
+ rr.r = ocfr
+ for _, opt := range opts {
+ opt(rr)
+ }
+
+ rr.maxOCF = 0
+ rr.maxRec = 0
+ rr.avroDatumCount = 0
+ rr.primed = false
+
+ rr.avroChan = make(chan any, rr.avroChanSize)
+ rr.recChan = make(chan arrow.Record, rr.recChanSize)
+ rr.bldDone = make(chan struct{})
+
+ rr.readerCtx, rr.readCancel = context.WithCancel(context.Background())
+ go rr.decodeOCFToChan()
+ go rr.recordFactory()
+ return nil
+}
+
+// Err returns the last error encountered during the iteration over the
+// underlying Avro file.
+func (r *OCFReader) Err() error { return r.err }
+
+// AvroSchema returns the Avro schema of the Avro OCF
+func (r *OCFReader) AvroSchema() string { return r.avroSchema }
+
+// Schema returns the converted Arrow schema of the Avro OCF
+func (r *OCFReader) Schema() *arrow.Schema { return r.schema }
+
+// Record returns the current record that has been extracted from the
+// underlying Avro OCF file.
+// It is valid until the next call to Next.
+func (r *OCFReader) Record() arrow.Record { return r.cur }
+
+// Metrics returns the maximum queue depth of the Avro record read cache and
of the
+// converted Arrow record cache.
+func (r *OCFReader) Metrics() string {
+ return fmt.Sprintf("Max. OCF queue depth: %d/%d Max. record queue
depth: %d/%d", r.maxOCF, r.avroChanSize, r.maxRec, r.recChanSize)
+}
+
+// OCFRecordsReadCount returns the number of Avro datum that were read from
the Avro file.
+func (r *OCFReader) OCFRecordsReadCount() int64 { return r.avroDatumCount }
+
+// Close closes the OCFReader's Avro record read cache and converted Arrow
record cache. OCFReader must
+// be closed if the Avro OCF's records have not been read to completion.
+func (r *OCFReader) Close() {
+ r.readCancel()
+ r.err = r.readerCtx.Err()
+}
+
+func (r *OCFReader) editAvroSchema(e schemaEdit) error {
+ var err error
+ switch e.method {
+ case "set":
+ r.avroSchema, err = sjson.Set(r.avroSchema, e.path, e.value)
+ if err != nil {
+ return fmt.Errorf("%w: schema edit 'set %s = %v'
failure - %v", arrow.ErrInvalid, e.path, e.value, err)
+ }
+ case "delete":
+ r.avroSchema, err = sjson.Delete(r.avroSchema, e.path)
+ if err != nil {
+ return fmt.Errorf("%w: schema edit 'delete' failure -
%v", arrow.ErrInvalid, err)
+ }
+ default:
+ return fmt.Errorf("%w: schema edit method must be 'set' or
'delete'", arrow.ErrInvalid)
+ }
+ return nil
+}
+
+// Next returns whether a Record can be received from the converted record
queue.
+// The user should check Err() after call to Next that return false to check
+// if an error took place.
+func (r *OCFReader) Next() bool {
+ if r.cur != nil {
+ r.cur.Release()
+ r.cur = nil
+ }
+ if r.maxOCF < len(r.avroChan) {
+ r.maxOCF = len(r.avroChan)
+ }
+ if r.maxRec < len(r.recChan) {
+ r.maxRec = len(r.recChan)
+ }
+ select {
+ case r.cur = <-r.recChan:
+ case <-r.bldDone:
+ if len(r.recChan) > 0 {
+ r.cur = <-r.recChan
+ }
+ }
+ if r.err != nil {
+ return false
+ }
+
+ return r.cur != nil
+}
+
+// WithAllocator specifies the Arrow memory allocator used while building
records.
+func WithAllocator(mem memory.Allocator) Option {
+ return func(cfg config) {
+ cfg.mem = mem
+ }
+}
+
+// WithReadCacheSize specifies the size of the OCF record decode queue,
default value
+// is 500.
+func WithReadCacheSize(n int) Option {
+ return func(cfg config) {
+ if n < 1 {
+ cfg.avroChanSize = 500
+ } else {
+ cfg.avroChanSize = n
+ }
+ }
+}
+
+// WithRecordCacheSize specifies the size of the converted Arrow record queue,
default
+// value is 1.
+func WithRecordCacheSize(n int) Option {
+ return func(cfg config) {
+ if n < 1 {
+ cfg.recChanSize = 1
+ } else {
+ cfg.recChanSize = n
+ }
+ }
+}
+
+// WithSchemaEdit specifies modifications to the Avro schema. Supported
methods are 'set' and
+// 'delete'. Set sets the value for the specified path. Delete deletes the
value for the specified path.
+// A path is in dot syntax, such as "fields.1" or "fields.0.type". The
modified Avro schema is
+// validated before conversion to Arrow schema - NewOCFReader will return an
error if the modified schema
+// cannot be parsed.
+func WithSchemaEdit(method, path string, value any) Option {
+ return func(cfg config) {
+ var e schemaEdit
+ e.method = method
+ e.path = path
+ e.value = value
+ cfg.avroSchemaEdits = append(cfg.avroSchemaEdits, e)
+ }
+}
+
+// WithChunk specifies the chunk size used while reading Avro OCF files.
+//
+// If n is zero or 1, no chunking will take place and the reader will create
+// one record per row.
+// If n is greater than 1, chunks of n rows will be read.
+// If n is negative, the reader will load the whole Avro OCF file into memory
and
+// create one big record with all the rows.
+func WithChunk(n int) Option {
+ return func(cfg config) {
+ cfg.chunk = n
+ }
+}
+
+// Retain increases the reference count by 1.
+// Retain may be called simultaneously from multiple goroutines.
+func (r *OCFReader) Retain() {
+ atomic.AddInt64(&r.refs, 1)
+}
+
+// Release decreases the reference count by 1.
+// When the reference count goes to zero, the memory is freed.
+// Release may be called simultaneously from multiple goroutines.
+func (r *OCFReader) Release() {
+ debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases")
+
+ if atomic.AddInt64(&r.refs, -1) == 0 {
+ if r.cur != nil {
+ r.cur.Release()
+ }
+ }
+}
+
+var _ array.RecordReader = (*OCFReader)(nil)
diff --git a/go/arrow/avro/reader_test.go b/go/arrow/avro/reader_test.go
new file mode 100644
index 0000000000..e94d4f48fb
--- /dev/null
+++ b/go/arrow/avro/reader_test.go
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/apache/arrow/go/v15/arrow"
+ hamba "github.com/hamba/avro/v2"
+)
+
+func TestEditSchemaStringEqual(t *testing.T) {
+ tests := []struct {
+ avroSchema string
+ arrowSchema []arrow.Field
+ }{
+ {
+ avroSchema: `{
+ "fields": [
+ {
+ "name": "inheritNull",
+ "type": {
+ "name": "Simple",
+ "symbols": [
+ "a",
+ "b"
+ ],
+ "type": "enum"
+ }
+ },
+ {
+ "name": "explicitNamespace",
+ "type": {
+ "name": "test",
+ "namespace":
"org.hamba.avro",
+ "size": 12,
+ "type": "fixed"
+ }
+ },
+ {
+ "name": "fullName",
+ "type": {
+ "type": "record",
+ "name": "fullName_data",
+ "namespace": "ignored",
+ "doc": "A name
attribute with a fullname, so the namespace attribute is ignored. The fullname
is 'a.full.Name', and the namespace is 'a.full'.",
+ "fields": [{
+ "name":
"inheritNamespace",
+ "type":
{
+
"type": "enum",
+
"name": "Understanding",
+
"doc": "A simple name (attribute) and no namespace attribute: inherit the
namespace of the enclosing type 'a.full.Name'. The fullname is
'a.full.Understanding'.",
+
"symbols": ["d", "e"]
+ }
+ }, {
+ "name":
"md5",
+ "type":
{
+ "name": "md5_data",
+ "type": "fixed",
+
"size": 16,
+
"namespace": "ignored"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "id",
+ "type": "int"
+ },
+ {
+ "name": "bigId",
+ "type": "long"
+ },
+ {
+ "name": "temperature",
+ "type": [
+ "null",
+ "float"
+ ]
+ },
+ {
+ "name": "fraction",
+ "type": [
+ "null",
+ "double"
+ ]
+ },
+ {
+ "name": "is_emergency",
+ "type": "boolean"
+ },
+ {
+ "name": "remote_ip",
+ "type": [
+ "null",
+ "bytes"
+ ]
+ },
+ {
+ "name": "person",
+ "type": {
+ "fields": [
+ {
+ "name":
"lastname",
+ "type":
"string"
+ },
+ {
+ "name":
"address",
+ "type":
{
+
"fields": [
+
{
+
"name": "streetaddress",
+
"type": "string"
+
},
+
{
+
"name": "city",
+
"type": "string"
+
}
+
],
+
"name": "AddressUSRecord",
+
"type": "record"
+ }
+ },
+ {
+ "name":
"mapfield",
+ "type":
{
+
"default": {
+
},
+
"type": "map",
+
"values": "long"
+ }
+ },
+ {
+ "name":
"arrayField",
+ "type":
{
+
"default": [
+
],
+
"items": "string",
+
"type": "array"
+ }
+ }
+ ],
+ "name": "person_data",
+ "type": "record"
+ }
+ },
+ {
+ "name": "decimalField",
+ "type": {
+ "logicalType":
"decimal",
+ "precision": 4,
+ "scale": 2,
+ "type": "bytes"
+ }
+ },
+ {
+ "logicalType": "uuid",
+ "name": "uuidField",
+ "type": "string"
+ },
+ {
+ "name": "timemillis",
+ "type": {
+ "type": "int",
+ "logicalType":
"time-millis"
+ }
+ },
+ {
+ "name": "timemicros",
+ "type": {
+ "type": "long",
+ "logicalType":
"time-micros"
+ }
+ },
+ {
+ "name": "timestampmillis",
+ "type": {
+ "type": "long",
+ "logicalType":
"timestamp-millis"
+ }
+ },
+ {
+ "name": "timestampmicros",
+ "type": {
+ "type": "long",
+ "logicalType":
"timestamp-micros"
+ }
+ },
+ {
+ "name": "duration",
+ "type": {
+ "name": "duration",
+ "namespace": "whyowhy",
+ "logicalType":
"duration",
+ "size": 12,
+ "type": "fixed"
+ }
+ },
+ {
+ "name": "date",
+ "type": {
+ "logicalType": "date",
+ "type": "int"
+ }
+ }
+ ],
+ "name": "Example",
+ "type": "record"
+ }`,
+ arrowSchema: []arrow.Field{
+ {
+ Name: "explicitNamespace",
+ Type:
&arrow.FixedSizeBinaryType{ByteWidth: 12},
+ },
+ {
+ Name: "fullName",
+ Type: arrow.StructOf(
+ arrow.Field{
+ Name:
"inheritNamespace",
+ Type:
&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8, ValueType:
arrow.BinaryTypes.String, Ordered: false},
+ },
+ arrow.Field{
+ Name: "md5",
+ Type:
&arrow.FixedSizeBinaryType{ByteWidth: 16},
+ },
+ ),
+ },
+ {
+ Name: "id",
+ Type: arrow.PrimitiveTypes.Int32,
+ },
+ {
+ Name: "bigId",
+ Type: arrow.PrimitiveTypes.Int64,
+ },
+ {
+ Name: "temperature",
+ Type: arrow.PrimitiveTypes.Float32,
+ Nullable: true,
+ },
+ {
+ Name: "fraction",
+ Type: arrow.PrimitiveTypes.Float64,
+ Nullable: true,
+ },
+ {
+ Name: "is_emergency",
+ Type: arrow.FixedWidthTypes.Boolean,
+ },
+ {
+ Name: "remote_ip",
+ Type: arrow.BinaryTypes.Binary,
+ Nullable: true,
+ },
+ {
+ Name: "person",
+ Type: arrow.StructOf(
+ arrow.Field{
+ Name: "lastname",
+ Type:
arrow.BinaryTypes.String,
+ },
+ arrow.Field{
+ Name: "address",
+ Type: arrow.StructOf(
+ arrow.Field{
+ Name:
"streetaddress",
+ Type:
arrow.BinaryTypes.String,
+ },
+ arrow.Field{
+ Name:
"city",
+ Type:
arrow.BinaryTypes.String,
+ },
+ ),
+ },
+ arrow.Field{
+ Name: "mapfield",
+ Type:
arrow.MapOf(arrow.BinaryTypes.String, arrow.PrimitiveTypes.Int64),
+ Nullable: true,
+ },
+ arrow.Field{
+ Name: "arrayField",
+ Type:
arrow.ListOfNonNullable(arrow.BinaryTypes.String),
+ },
+ ),
+ },
+ {
+ Name: "decimalField",
+ Type: &arrow.Decimal128Type{Precision:
4, Scale: 2},
+ },
+ {
+ Name: "uuidField",
+ Type: arrow.BinaryTypes.String,
+ },
+ {
+ Name: "timemillis",
+ Type: arrow.FixedWidthTypes.Time32ms,
+ },
+ {
+ Name: "timemicros",
+ Type: arrow.FixedWidthTypes.Time64us,
+ },
+ {
+ Name: "timestampmillis",
+ Type:
arrow.FixedWidthTypes.Timestamp_ms,
+ },
+ {
+ Name: "timestampmicros",
+ Type:
arrow.FixedWidthTypes.Timestamp_us,
+ },
+ {
+ Name: "duration",
+ Type:
arrow.FixedWidthTypes.MonthDayNanoInterval,
+ },
+ {
+ Name: "date",
+ Type: arrow.FixedWidthTypes.Date32,
+ },
+ },
+ },
+ }
+
+ for _, test := range tests {
+ t.Run("", func(t *testing.T) {
+ want := arrow.NewSchema(test.arrowSchema, nil)
+
+ schema, err := hamba.ParseBytes([]byte(test.avroSchema))
+ if err != nil {
+ t.Fatalf("%v", err)
+ }
+ r := new(OCFReader)
+ r.avroSchema = schema.String()
+ r.editAvroSchema(schemaEdit{method: "delete", path:
"fields.0"})
+ schema, err = hamba.Parse(r.avroSchema)
+ if err != nil {
+ t.Fatalf("%v: could not parse modified avro
schema", arrow.ErrInvalid)
+ }
+ got, err := ArrowSchemaFromAvro(schema)
+ if err != nil {
+ t.Fatalf("%v", err)
+ }
+ if !(fmt.Sprintf("%+v", want.String()) ==
fmt.Sprintf("%+v", got.String())) {
+ t.Fatalf("got=%v,\n want=%v", got.String(),
want.String())
+ } else {
+ t.Logf("schema.String() comparison passed")
+ }
+ })
+ }
+}
diff --git a/go/arrow/avro/reader_types.go b/go/arrow/avro/reader_types.go
new file mode 100644
index 0000000000..5658c6e587
--- /dev/null
+++ b/go/arrow/avro/reader_types.go
@@ -0,0 +1,875 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "math/big"
+
+ "github.com/apache/arrow/go/v15/arrow"
+ "github.com/apache/arrow/go/v15/arrow/array"
+ "github.com/apache/arrow/go/v15/arrow/decimal128"
+ "github.com/apache/arrow/go/v15/arrow/decimal256"
+ "github.com/apache/arrow/go/v15/arrow/memory"
+ "github.com/apache/arrow/go/v15/internal/types"
+)
+
+type dataLoader struct {
+ idx, depth int32
+ list *fieldPos
+ item *fieldPos
+ mapField *fieldPos
+ mapKey *fieldPos
+ mapValue *fieldPos
+ fields []*fieldPos
+ children []*dataLoader
+}
+
+var (
+ ErrNullStructData = errors.New("null struct data")
+)
+
+func newDataLoader() *dataLoader { return &dataLoader{idx: 0, depth: 0} }
+
+// drawTree takes the tree of field builders produced by mapFieldBuilders()
+// and produces another tree structure and aggregates fields whose values can
+// be retrieved from a `map[string]any` into a slice of builders, and creates
a hierarchy to
+// deal with nested types (lists and maps).
+func (d *dataLoader) drawTree(field *fieldPos) {
+ for _, f := range field.children() {
+ if f.isList || f.isMap {
+ if f.isList {
+ c := d.newListChild(f)
+ if !f.childrens[0].isList {
+ c.item = f.childrens[0]
+ c.drawTree(f.childrens[0])
+ } else {
+ c.drawTree(f.childrens[0].childrens[0])
+ }
+ }
+ if f.isMap {
+ c := d.newMapChild(f)
+ if
!arrow.IsNested(f.childrens[1].builder.Type().ID()) {
+ c.mapKey = f.childrens[0]
+ c.mapValue = f.childrens[1]
+ } else {
+ c.mapKey = f.childrens[0]
+ m := c.newChild()
+ m.mapValue = f.childrens[1]
+ m.drawTree(f.childrens[1])
+ }
+ }
+ } else {
+ d.fields = append(d.fields, f)
+ if len(f.children()) > 0 {
+ d.drawTree(f)
+ }
+ }
+ }
+}
+
+// loadDatum loads decoded Avro data to the schema fields' builder functions.
+// Since array.StructBuilder.AppendNull() will recursively append null to all
of the
+// struct's fields, in the case of nil being passed to a struct's builderFunc
it will
+// return a ErrNullStructData error to signal that all its sub-fields can be
skipped.
+func (d *dataLoader) loadDatum(data any) error {
+ if d.list == nil && d.mapField == nil {
+ if d.mapValue != nil {
+ d.mapValue.appendFunc(data)
+ }
+ var NullParent *fieldPos
+ for _, f := range d.fields {
+ if f.parent == NullParent {
+ continue
+ }
+ if d.mapValue == nil {
+ err := f.appendFunc(f.getValue(data))
+ if err != nil {
+ if err == ErrNullStructData {
+ NullParent = f
+ continue
+ }
+ return err
+ }
+ } else {
+ switch dt := data.(type) {
+ case nil:
+ err := f.appendFunc(dt)
+ if err != nil {
+ if err == ErrNullStructData {
+ NullParent = f
+ continue
+ }
+ return err
+ }
+ case []any:
+ if len(d.children) < 1 {
+ for _, e := range dt {
+ err := f.appendFunc(e)
+ if err != nil {
+ if err ==
ErrNullStructData {
+
NullParent = f
+ continue
+ }
+ return err
+ }
+ }
+ } else {
+ for _, e := range dt {
+
d.children[0].loadDatum(e)
+ }
+ }
+ case map[string]any:
+ err := f.appendFunc(f.getValue(dt))
+ if err != nil {
+ if err == ErrNullStructData {
+ NullParent = f
+ continue
+ }
+ return err
+ }
+ }
+
+ }
+ }
+ for _, c := range d.children {
+ if c.list != nil {
+ c.loadDatum(c.list.getValue(data))
+ }
+ if c.mapField != nil {
+ switch dt := data.(type) {
+ case nil:
+ c.loadDatum(dt)
+ case map[string]any:
+ c.loadDatum(c.mapField.getValue(dt))
+ default:
+ c.loadDatum(c.mapField.getValue(data))
+ }
+ }
+ }
+ } else {
+ if d.list != nil {
+ switch dt := data.(type) {
+ case nil:
+ d.list.appendFunc(dt)
+ case []any:
+ d.list.appendFunc(dt)
+ for _, e := range dt {
+ if d.item != nil {
+ d.item.appendFunc(e)
+ }
+ var NullParent *fieldPos
+ for _, f := range d.fields {
+ if f.parent == NullParent {
+ continue
+ }
+ err :=
f.appendFunc(f.getValue(e))
+ if err != nil {
+ if err ==
ErrNullStructData {
+ NullParent = f
+ continue
+ }
+ return err
+ }
+ }
+ for _, c := range d.children {
+ if c.list != nil {
+
c.loadDatum(c.list.getValue(e))
+ }
+ if c.mapField != nil {
+
c.loadDatum(c.mapField.getValue(e))
+ }
+ }
+ }
+ case map[string]any:
+ d.list.appendFunc(dt["array"])
+ for _, e := range dt["array"].([]any) {
+ if d.item != nil {
+ d.item.appendFunc(e)
+ }
+ var NullParent *fieldPos
+ for _, f := range d.fields {
+ if f.parent == NullParent {
+ continue
+ }
+ err :=
f.appendFunc(f.getValue(e))
+ if err != nil {
+ if err ==
ErrNullStructData {
+ NullParent = f
+ continue
+ }
+ return err
+ }
+ }
+ for _, c := range d.children {
+ c.loadDatum(c.list.getValue(e))
+ }
+ }
+ default:
+ d.list.appendFunc(data)
+ d.item.appendFunc(dt)
+ }
+ }
+ if d.mapField != nil {
+ switch dt := data.(type) {
+ case nil:
+ d.mapField.appendFunc(dt)
+ case map[string]any:
+
+ d.mapField.appendFunc(dt)
+ for k, v := range dt {
+ d.mapKey.appendFunc(k)
+ if d.mapValue != nil {
+ d.mapValue.appendFunc(v)
+ } else {
+ d.children[0].loadDatum(v)
+ }
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func (d *dataLoader) newChild() *dataLoader {
+ var child *dataLoader = &dataLoader{
+ depth: d.depth + 1,
+ }
+ d.children = append(d.children, child)
+ return child
+}
+
+func (d *dataLoader) newListChild(list *fieldPos) *dataLoader {
+ var child *dataLoader = &dataLoader{
+ list: list,
+ item: list.childrens[0],
+ depth: d.depth + 1,
+ }
+ d.children = append(d.children, child)
+ return child
+}
+
+func (d *dataLoader) newMapChild(mapField *fieldPos) *dataLoader {
+ var child *dataLoader = &dataLoader{
+ mapField: mapField,
+ depth: d.depth + 1,
+ }
+ d.children = append(d.children, child)
+ return child
+}
+
+type fieldPos struct {
+ parent *fieldPos
+ fieldName string
+ builder array.Builder
+ path []string
+ isList bool
+ isItem bool
+ isStruct bool
+ isMap bool
+ typeName string
+ appendFunc func(val interface{}) error
+ metadatas arrow.Metadata
+ childrens []*fieldPos
+ index, depth int32
+}
+
+func newFieldPos() *fieldPos { return &fieldPos{index: -1} }
+
+func (f *fieldPos) children() []*fieldPos { return f.childrens }
+
+func (f *fieldPos) newChild(childName string, childBuilder array.Builder, meta
arrow.Metadata) *fieldPos {
+ var child fieldPos = fieldPos{
+ parent: f,
+ fieldName: childName,
+ builder: childBuilder,
+ metadatas: meta,
+ index: int32(len(f.childrens)),
+ depth: f.depth + 1,
+ }
+ if f.isList {
+ child.isItem = true
+ }
+ child.path = child.buildNamePath()
+ f.childrens = append(f.childrens, &child)
+ return &child
+}
+
+func (f *fieldPos) buildNamePath() []string {
+ var path []string
+ var listPath []string
+ cur := f
+ for i := f.depth - 1; i >= 0; i-- {
+ if cur.typeName == "" {
+ path = append([]string{cur.fieldName}, path...)
+ } else {
+ path = append([]string{cur.fieldName, cur.typeName},
path...)
+ }
+ if !cur.parent.isMap {
+ cur = cur.parent
+ }
+ }
+ if f.parent.parent != nil && f.parent.parent.isList {
+ for i := len(path) - 1; i >= 0; i-- {
+ if path[i] != "item" {
+ listPath = append([]string{path[i]},
listPath...)
+ } else {
+ return listPath
+ }
+ }
+ }
+ if f.parent != nil && f.parent.fieldName == "value" {
+ for i := len(path) - 1; i >= 0; i-- {
+ if path[i] != "value" {
+ listPath = append([]string{path[i]},
listPath...)
+ } else {
+ return listPath
+ }
+ }
+ }
+ return path
+}
+
+// NamePath returns a slice of keys making up the path to the field
+func (f *fieldPos) namePath() []string { return f.path }
+
+// GetValue retrieves the value from the map[string]any
+// by following the field's key path
+func (f *fieldPos) getValue(m any) any {
+ if _, ok := m.(map[string]any); !ok {
+ return m
+ }
+ for _, key := range f.namePath() {
+ valueMap, ok := m.(map[string]any)
+ if !ok {
+ if key == "item" {
+ return m
+ }
+ return nil
+ }
+ m, ok = valueMap[key]
+ if !ok {
+ return nil
+ }
+ }
+ return m
+}
+
+// Avro data is loaded to Arrow arrays using the following type mapping:
+//
+// Avro Go Arrow
+// null nil
Null
+// boolean bool Boolean
+// bytes []byte Binary
+// float float32 Float32
+// double float64 Float64
+// long int64 Int64
+// int int32
Int32
+// string string String
+// array []interface{} List
+// enum string
Dictionary
+// fixed []byte
FixedSizeBinary
+// map and record map[string]any Struct
+//
+// mapFieldBuilders builds a tree of field builders matching the Arrow schema
+func mapFieldBuilders(b array.Builder, field arrow.Field, parent *fieldPos) {
+ f := parent.newChild(field.Name, b, field.Metadata)
+ switch bt := b.(type) {
+ case *array.BinaryBuilder:
+ f.appendFunc = func(data interface{}) error {
+ appendBinaryData(bt, data)
+ return nil
+ }
+ case *array.BinaryDictionaryBuilder:
+ // has metadata for Avro enum symbols
+ f.appendFunc = func(data interface{}) error {
+ appendBinaryDictData(bt, data)
+ return nil
+ }
+ // add Avro enum symbols to builder
+ sb := array.NewStringBuilder(memory.DefaultAllocator)
+ for _, v := range field.Metadata.Values() {
+ sb.Append(v)
+ }
+ sa := sb.NewStringArray()
+ bt.InsertStringDictValues(sa)
+ case *array.BooleanBuilder:
+ f.appendFunc = func(data interface{}) error {
+ appendBoolData(bt, data)
+ return nil
+ }
+ case *array.Date32Builder:
+ f.appendFunc = func(data interface{}) error {
+ appendDate32Data(bt, data)
+ return nil
+ }
+ case *array.Decimal128Builder:
+ f.appendFunc = func(data interface{}) error {
+ err := appendDecimal128Data(bt, data)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+ case *array.Decimal256Builder:
+ f.appendFunc = func(data interface{}) error {
+ err := appendDecimal256Data(bt, data)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+ case *types.UUIDBuilder:
+ f.appendFunc = func(data interface{}) error {
+ switch dt := data.(type) {
+ case nil:
+ bt.AppendNull()
+ case string:
+ err := bt.AppendValueFromString(dt)
+ if err != nil {
+ return err
+ }
+ case []byte:
+ err := bt.AppendValueFromString(string(dt))
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ case *array.FixedSizeBinaryBuilder:
+ f.appendFunc = func(data interface{}) error {
+ appendFixedSizeBinaryData(bt, data)
+ return nil
+ }
+ case *array.Float32Builder:
+ f.appendFunc = func(data interface{}) error {
+ appendFloat32Data(bt, data)
+ return nil
+ }
+ case *array.Float64Builder:
+ f.appendFunc = func(data interface{}) error {
+ appendFloat64Data(bt, data)
+ return nil
+ }
+ case *array.Int32Builder:
+ f.appendFunc = func(data interface{}) error {
+ appendInt32Data(bt, data)
+ return nil
+ }
+ case *array.Int64Builder:
+ f.appendFunc = func(data interface{}) error {
+ appendInt64Data(bt, data)
+ return nil
+ }
+ case *array.LargeListBuilder:
+ vb := bt.ValueBuilder()
+ f.isList = true
+ mapFieldBuilders(vb,
field.Type.(*arrow.LargeListType).ElemField(), f)
+ f.appendFunc = func(data interface{}) error {
+ switch dt := data.(type) {
+ case nil:
+ bt.AppendNull()
+ case []interface{}:
+ if len(dt) == 0 {
+ bt.AppendEmptyValue()
+ } else {
+ bt.Append(true)
+ }
+ default:
+ bt.Append(true)
+ }
+ return nil
+ }
+ case *array.ListBuilder:
+ vb := bt.ValueBuilder()
+ f.isList = true
+ mapFieldBuilders(vb, field.Type.(*arrow.ListType).ElemField(),
f)
+ f.appendFunc = func(data interface{}) error {
+ switch dt := data.(type) {
+ case nil:
+ bt.AppendNull()
+ case []interface{}:
+ if len(dt) == 0 {
+ bt.AppendEmptyValue()
+ } else {
+ bt.Append(true)
+ }
+ default:
+ bt.Append(true)
+ }
+ return nil
+ }
+ case *array.MapBuilder:
+ // has metadata for objects in values
+ f.isMap = true
+ kb := bt.KeyBuilder()
+ ib := bt.ItemBuilder()
+ mapFieldBuilders(kb, field.Type.(*arrow.MapType).KeyField(), f)
+ mapFieldBuilders(ib, field.Type.(*arrow.MapType).ItemField(), f)
+ f.appendFunc = func(data interface{}) error {
+ switch data.(type) {
+ case nil:
+ bt.AppendNull()
+ default:
+ bt.Append(true)
+ }
+ return nil
+ }
+ case *array.MonthDayNanoIntervalBuilder:
+ f.appendFunc = func(data interface{}) error {
+ appendDurationData(bt, data)
+ return nil
+ }
+ case *array.StringBuilder:
+ f.appendFunc = func(data interface{}) error {
+ appendStringData(bt, data)
+ return nil
+ }
+ case *array.StructBuilder:
+ // has metadata for Avro Union named types
+ f.typeName, _ = field.Metadata.GetValue("typeName")
+ f.isStruct = true
+ // create children
+ for i, p := range field.Type.(*arrow.StructType).Fields() {
+ mapFieldBuilders(bt.FieldBuilder(i), p, f)
+ }
+ f.appendFunc = func(data interface{}) error {
+ switch data.(type) {
+ case nil:
+ bt.AppendNull()
+ return ErrNullStructData
+ default:
+ bt.Append(true)
+ }
+ return nil
+ }
+ case *array.Time32Builder:
+ f.appendFunc = func(data interface{}) error {
+ appendTime32Data(bt, data)
+ return nil
+ }
+ case *array.Time64Builder:
+ f.appendFunc = func(data interface{}) error {
+ appendTime64Data(bt, data)
+ return nil
+ }
+ case *array.TimestampBuilder:
+ f.appendFunc = func(data interface{}) error {
+ appendTimestampData(bt, data)
+ return nil
+ }
+ }
+}
+
+func appendBinaryData(b *array.BinaryBuilder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case map[string]any:
+ switch ct := dt["bytes"].(type) {
+ case nil:
+ b.AppendNull()
+ default:
+ b.Append(ct.([]byte))
+ }
+ default:
+ b.Append(fmt.Append([]byte{}, data))
+ }
+}
+
+func appendBinaryDictData(b *array.BinaryDictionaryBuilder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case string:
+ b.AppendString(dt)
+ case map[string]any:
+ switch v := dt["string"].(type) {
+ case nil:
+ b.AppendNull()
+ case string:
+ b.AppendString(v)
+ }
+ }
+}
+
+func appendBoolData(b *array.BooleanBuilder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case bool:
+ b.Append(dt)
+ case map[string]any:
+ switch v := dt["boolean"].(type) {
+ case nil:
+ b.AppendNull()
+ case bool:
+ b.Append(v)
+ }
+ }
+}
+
+func appendDate32Data(b *array.Date32Builder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case int32:
+ b.Append(arrow.Date32(dt))
+ case map[string]any:
+ switch v := dt["int"].(type) {
+ case nil:
+ b.AppendNull()
+ case int32:
+ b.Append(arrow.Date32(v))
+ }
+ }
+}
+
+func appendDecimal128Data(b *array.Decimal128Builder, data interface{}) error {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case []byte:
+ buf := bytes.NewBuffer(dt)
+ if len(dt) <= 38 {
+ var intData int64
+ err := binary.Read(buf, binary.BigEndian, &intData)
+ if err != nil {
+ return err
+ }
+ b.Append(decimal128.FromI64(intData))
+ } else {
+ var bigIntData big.Int
+
b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
+ }
+ case map[string]any:
+ buf := bytes.NewBuffer(dt["bytes"].([]byte))
+ if len(dt["bytes"].([]byte)) <= 38 {
+ var intData int64
+ err := binary.Read(buf, binary.BigEndian, &intData)
+ if err != nil {
+ return err
+ }
+ b.Append(decimal128.FromI64(intData))
+ } else {
+ var bigIntData big.Int
+
b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
+ }
+ }
+ return nil
+}
+
+func appendDecimal256Data(b *array.Decimal256Builder, data interface{}) error {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case []byte:
+ var bigIntData big.Int
+ buf := bytes.NewBuffer(dt)
+
b.Append(decimal256.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
+ case map[string]any:
+ var bigIntData big.Int
+ buf := bytes.NewBuffer(dt["bytes"].([]byte))
+
b.Append(decimal256.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
+ }
+ return nil
+}
+
+// Avro duration logical type annotates Avro fixed type of size 12, which
stores three little-endian
+// unsigned integers that represent durations at different granularities of
time. The first stores
+// a number in months, the second stores a number in days, and the third
stores a number in milliseconds.
+func appendDurationData(b *array.MonthDayNanoIntervalBuilder, data
interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case []byte:
+ dur := new(arrow.MonthDayNanoInterval)
+ dur.Months = int32(binary.LittleEndian.Uint16(dt[:3]))
+ dur.Days = int32(binary.LittleEndian.Uint16(dt[4:7]))
+ dur.Nanoseconds = int64(binary.LittleEndian.Uint32(dt[8:]) *
1000000)
+ b.Append(*dur)
+ case map[string]any:
+ switch dtb := dt["bytes"].(type) {
+ case nil:
+ b.AppendNull()
+ case []byte:
+ dur := new(arrow.MonthDayNanoInterval)
+ dur.Months = int32(binary.LittleEndian.Uint16(dtb[:3]))
+ dur.Days = int32(binary.LittleEndian.Uint16(dtb[4:7]))
+ dur.Nanoseconds =
int64(binary.LittleEndian.Uint32(dtb[8:]) * 1000000)
+ b.Append(*dur)
+ }
+ }
+}
+
+func appendFixedSizeBinaryData(b *array.FixedSizeBinaryBuilder, data
interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case []byte:
+ b.Append(dt)
+ case map[string]any:
+ switch v := dt["bytes"].(type) {
+ case nil:
+ b.AppendNull()
+ case []byte:
+ b.Append(v)
+ }
+ }
+}
+
+func appendFloat32Data(b *array.Float32Builder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case float32:
+ b.Append(dt)
+ case map[string]any:
+ switch v := dt["float"].(type) {
+ case nil:
+ b.AppendNull()
+ case float32:
+ b.Append(v)
+ }
+ }
+}
+
+func appendFloat64Data(b *array.Float64Builder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case float64:
+ b.Append(dt)
+ case map[string]any:
+ switch v := dt["double"].(type) {
+ case nil:
+ b.AppendNull()
+ case float64:
+ b.Append(v)
+ }
+ }
+}
+
+func appendInt32Data(b *array.Int32Builder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case int:
+ b.Append(int32(dt))
+ case int32:
+ b.Append(dt)
+ case map[string]any:
+ switch v := dt["int"].(type) {
+ case nil:
+ b.AppendNull()
+ case int:
+ b.Append(int32(v))
+ case int32:
+ b.Append(v)
+ }
+ }
+}
+
+func appendInt64Data(b *array.Int64Builder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case int:
+ b.Append(int64(dt))
+ case int64:
+ b.Append(dt)
+ case map[string]any:
+ switch v := dt["long"].(type) {
+ case nil:
+ b.AppendNull()
+ case int:
+ b.Append(int64(v))
+ case int64:
+ b.Append(v)
+ }
+ }
+}
+
+func appendStringData(b *array.StringBuilder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case string:
+ b.Append(dt)
+ case map[string]any:
+ switch v := dt["string"].(type) {
+ case nil:
+ b.AppendNull()
+ case string:
+ b.Append(v)
+ }
+ default:
+ b.Append(fmt.Sprint(data))
+ }
+}
+
+func appendTime32Data(b *array.Time32Builder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case int32:
+ b.Append(arrow.Time32(dt))
+ case map[string]any:
+ switch v := dt["int"].(type) {
+ case nil:
+ b.AppendNull()
+ case int32:
+ b.Append(arrow.Time32(v))
+ }
+ }
+}
+
+func appendTime64Data(b *array.Time64Builder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case int64:
+ b.Append(arrow.Time64(dt))
+ case map[string]any:
+ switch v := dt["long"].(type) {
+ case nil:
+ b.AppendNull()
+ case int64:
+ b.Append(arrow.Time64(v))
+ }
+ }
+}
+
+func appendTimestampData(b *array.TimestampBuilder, data interface{}) {
+ switch dt := data.(type) {
+ case nil:
+ b.AppendNull()
+ case int64:
+ b.Append(arrow.Timestamp(dt))
+ case map[string]any:
+ switch v := dt["long"].(type) {
+ case nil:
+ b.AppendNull()
+ case int64:
+ b.Append(arrow.Timestamp(v))
+ }
+ }
+}
diff --git a/go/arrow/avro/schema.go b/go/arrow/avro/schema.go
new file mode 100644
index 0000000000..32e37096c6
--- /dev/null
+++ b/go/arrow/avro/schema.go
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package avro reads Avro OCF files and presents the extracted data as records
+package avro
+
+import (
+ "fmt"
+ "math"
+ "strconv"
+
+ "github.com/apache/arrow/go/v15/arrow"
+ "github.com/apache/arrow/go/v15/arrow/decimal128"
+ "github.com/apache/arrow/go/v15/internal/types"
+ avro "github.com/hamba/avro/v2"
+)
+
+type schemaNode struct {
+ name string
+ parent *schemaNode
+ schema avro.Schema
+ union bool
+ nullable bool
+ childrens []*schemaNode
+ arrowField arrow.Field
+ schemaCache *avro.SchemaCache
+ index, depth int32
+}
+
+func newSchemaNode() *schemaNode {
+ var schemaCache avro.SchemaCache
+ return &schemaNode{name: "", index: -1, schemaCache: &schemaCache}
+}
+
+func (node *schemaNode) schemaPath() string {
+ var path string
+ n := node
+ for n.parent != nil {
+ path = "." + n.name + path
+ n = n.parent
+ }
+ return path
+}
+
+func (node *schemaNode) newChild(n string, s avro.Schema) *schemaNode {
+ child := &schemaNode{
+ name: n,
+ parent: node,
+ schema: s,
+ schemaCache: node.schemaCache,
+ index: int32(len(node.childrens)),
+ depth: node.depth + 1,
+ }
+ node.childrens = append(node.childrens, child)
+ return child
+}
+func (node *schemaNode) children() []*schemaNode { return node.childrens }
+
+// func (node *schemaNode) nodeName() string { return node.name }
+
+// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema
+func ArrowSchemaFromAvro(schema avro.Schema) (s *arrow.Schema, err error) {
+ defer func() {
+ if r := recover(); r != nil {
+ s = nil
+ switch x := r.(type) {
+ case string:
+ err = fmt.Errorf("invalid avro schema: %s", x)
+ case error:
+ err = fmt.Errorf("invalid avro schema: %w", x)
+ default:
+ err = fmt.Errorf("invalid avro schema: unknown
error")
+ }
+ }
+ }()
+ n := newSchemaNode()
+ n.schema = schema
+ c := n.newChild(n.schema.(avro.NamedSchema).Name(), n.schema)
+ arrowSchemafromAvro(c)
+ var fields []arrow.Field
+ for _, g := range c.children() {
+ fields = append(fields, g.arrowField)
+ }
+ s = arrow.NewSchema(fields, nil)
+ return s, nil
+}
+
+func arrowSchemafromAvro(n *schemaNode) {
+ if ns, ok := n.schema.(avro.NamedSchema); ok {
+ n.schemaCache.Add(ns.Name(), ns)
+ }
+ switch st := n.schema.Type(); st {
+ case "record":
+ iterateFields(n)
+ case "enum":
+ n.schemaCache.Add(n.schema.(avro.NamedSchema).Name(),
n.schema.(*avro.EnumSchema))
+ symbols := make(map[string]string)
+ for index, symbol := range
n.schema.(avro.PropertySchema).(*avro.EnumSchema).Symbols() {
+ k := strconv.FormatInt(int64(index), 10)
+ symbols[k] = symbol
+ }
+ var dt arrow.DictionaryType = arrow.DictionaryType{IndexType:
arrow.PrimitiveTypes.Uint64, ValueType: arrow.BinaryTypes.String, Ordered:
false}
+ sl := int64(len(symbols))
+ switch {
+ case sl <= math.MaxUint8:
+ dt.IndexType = arrow.PrimitiveTypes.Uint8
+ case sl > math.MaxUint8 && sl <= math.MaxUint16:
+ dt.IndexType = arrow.PrimitiveTypes.Uint16
+ case sl > math.MaxUint16 && sl <= math.MaxUint32:
+ dt.IndexType = arrow.PrimitiveTypes.Uint32
+ }
+ n.arrowField = buildArrowField(n, &dt,
arrow.MetadataFrom(symbols))
+ case "array":
+ // logical items type
+ c := n.newChild(n.name, n.schema.(*avro.ArraySchema).Items())
+ if isLogicalSchemaType(n.schema.(*avro.ArraySchema).Items()) {
+ avroLogicalToArrowField(c)
+ } else {
+ arrowSchemafromAvro(c)
+ }
+ switch c.arrowField.Nullable {
+ case true:
+ n.arrowField = arrow.Field{Name: n.name, Type:
arrow.ListOfField(c.arrowField), Metadata: c.arrowField.Metadata}
+ case false:
+ n.arrowField = arrow.Field{Name: n.name, Type:
arrow.ListOfNonNullable(c.arrowField.Type), Metadata: c.arrowField.Metadata}
+ }
+ case "map":
+
n.schemaCache.Add(n.schema.(*avro.MapSchema).Values().(avro.NamedSchema).Name(),
n.schema.(*avro.MapSchema).Values())
+ c := n.newChild(n.name, n.schema.(*avro.MapSchema).Values())
+ arrowSchemafromAvro(c)
+ n.arrowField = buildArrowField(n,
arrow.MapOf(arrow.BinaryTypes.String, c.arrowField.Type), c.arrowField.Metadata)
+ case "union":
+ if n.schema.(*avro.UnionSchema).Nullable() {
+ if len(n.schema.(*avro.UnionSchema).Types()) > 1 {
+ n.schema =
n.schema.(*avro.UnionSchema).Types()[1]
+ n.union = true
+ n.nullable = true
+ arrowSchemafromAvro(n)
+ }
+ }
+ // Avro "fixed" field type = Arrow FixedSize Primitive BinaryType
+ case "fixed":
+ n.schemaCache.Add(n.schema.(avro.NamedSchema).Name(),
n.schema.(*avro.FixedSchema))
+ if isLogicalSchemaType(n.schema) {
+ avroLogicalToArrowField(n)
+ } else {
+ n.arrowField = buildArrowField(n,
&arrow.FixedSizeBinaryType{ByteWidth: n.schema.(*avro.FixedSchema).Size()},
arrow.Metadata{})
+ }
+ case "string", "bytes", "int", "long":
+ if isLogicalSchemaType(n.schema) {
+ avroLogicalToArrowField(n)
+ } else {
+ n.arrowField = buildArrowField(n,
avroPrimitiveToArrowType(string(st)), arrow.Metadata{})
+ }
+ case "float", "double", "boolean":
+ n.arrowField = arrow.Field{Name: n.name, Type:
avroPrimitiveToArrowType(string(st)), Nullable: n.nullable}
+ case "<ref>":
+ refSchema :=
n.schemaCache.Get(string(n.schema.(*avro.RefSchema).Schema().Name()))
+ if refSchema == nil {
+ panic(fmt.Errorf("could not find schema for '%v' in
schema cache - %v", n.schemaPath(), n.schema.(*avro.RefSchema).Schema().Name()))
+ }
+ n.schema = refSchema
+ arrowSchemafromAvro(n)
+ case "null":
+
n.schemaCache.Add(n.schema.(*avro.MapSchema).Values().(avro.NamedSchema).Name(),
&avro.NullSchema{})
+ n.nullable = true
+ n.arrowField = buildArrowField(n, arrow.Null, arrow.Metadata{})
+ }
+}
+
+// iterate record Fields()
+func iterateFields(n *schemaNode) {
+ for _, f := range n.schema.(*avro.RecordSchema).Fields() {
+ switch ft := f.Type().(type) {
+ // Avro "array" field type
+ case *avro.ArraySchema:
+ n.schemaCache.Add(f.Name(), ft.Items())
+ // logical items type
+ c := n.newChild(f.Name(), ft.Items())
+ if isLogicalSchemaType(ft.Items()) {
+ avroLogicalToArrowField(c)
+ } else {
+ arrowSchemafromAvro(c)
+ }
+ switch c.arrowField.Nullable {
+ case true:
+ c.arrowField = arrow.Field{Name: c.name, Type:
arrow.ListOfField(c.arrowField), Metadata: c.arrowField.Metadata}
+ case false:
+ c.arrowField = arrow.Field{Name: c.name, Type:
arrow.ListOfNonNullable(c.arrowField.Type), Metadata: c.arrowField.Metadata}
+ }
+ // Avro "enum" field type = Arrow dictionary type
+ case *avro.EnumSchema:
+ n.schemaCache.Add(f.Type().(*avro.EnumSchema).Name(),
f.Type())
+ c := n.newChild(f.Name(), f.Type())
+ symbols := make(map[string]string)
+ for index, symbol := range ft.Symbols() {
+ k := strconv.FormatInt(int64(index), 10)
+ symbols[k] = symbol
+ }
+ var dt arrow.DictionaryType =
arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType:
arrow.BinaryTypes.String, Ordered: false}
+ sl := len(symbols)
+ switch {
+ case sl <= math.MaxUint8:
+ dt.IndexType = arrow.PrimitiveTypes.Uint8
+ case sl > math.MaxUint8 && sl <= math.MaxUint16:
+ dt.IndexType = arrow.PrimitiveTypes.Uint16
+ case sl > math.MaxUint16 && sl <= math.MaxInt:
+ dt.IndexType = arrow.PrimitiveTypes.Uint32
+ }
+ c.arrowField = buildArrowField(c, &dt,
arrow.MetadataFrom(symbols))
+ // Avro "fixed" field type = Arrow FixedSize Primitive
BinaryType
+ case *avro.FixedSchema:
+ n.schemaCache.Add(f.Name(), f.Type())
+ c := n.newChild(f.Name(), f.Type())
+ if isLogicalSchemaType(f.Type()) {
+ avroLogicalToArrowField(c)
+ } else {
+ arrowSchemafromAvro(c)
+ }
+ case *avro.RecordSchema:
+ n.schemaCache.Add(f.Name(), f.Type())
+ c := n.newChild(f.Name(), f.Type())
+ iterateFields(c)
+ // Avro "map" field type - KVP with value of one type -
keys are strings
+ case *avro.MapSchema:
+ n.schemaCache.Add(f.Name(), ft.Values())
+ c := n.newChild(f.Name(), ft.Values())
+ arrowSchemafromAvro(c)
+ c.arrowField = buildArrowField(c,
arrow.MapOf(arrow.BinaryTypes.String, c.arrowField.Type), c.arrowField.Metadata)
+ case *avro.UnionSchema:
+ if ft.Nullable() {
+ if len(ft.Types()) > 1 {
+ n.schemaCache.Add(f.Name(),
ft.Types()[1])
+ c := n.newChild(f.Name(), ft.Types()[1])
+ c.union = true
+ c.nullable = true
+ arrowSchemafromAvro(c)
+ }
+ }
+ default:
+ n.schemaCache.Add(f.Name(), f.Type())
+ if isLogicalSchemaType(f.Type()) {
+ c := n.newChild(f.Name(), f.Type())
+ avroLogicalToArrowField(c)
+ } else {
+ c := n.newChild(f.Name(), f.Type())
+ arrowSchemafromAvro(c)
+ }
+
+ }
+ }
+ var fields []arrow.Field
+ for _, child := range n.children() {
+ fields = append(fields, child.arrowField)
+ }
+
+ namedSchema, ok := isNamedSchema(n.schema)
+
+ var md arrow.Metadata
+ if ok && namedSchema != n.name+"_data" && n.union {
+ md = arrow.NewMetadata([]string{"typeName"},
[]string{namedSchema})
+ }
+ n.arrowField = buildArrowField(n, arrow.StructOf(fields...), md)
+}
+
+func isLogicalSchemaType(s avro.Schema) bool {
+ lts, ok := s.(avro.LogicalTypeSchema)
+ if !ok {
+ return false
+ }
+ if lts.Logical() != nil {
+ return true
+ }
+ return false
+}
+
+func isNamedSchema(s avro.Schema) (string, bool) {
+ if ns, ok := s.(avro.NamedSchema); ok {
+ return ns.FullName(), ok
+ }
+ return "", false
+}
+
+func buildArrowField(n *schemaNode, t arrow.DataType, m arrow.Metadata)
arrow.Field {
+ return arrow.Field{
+ Name: n.name,
+ Type: t,
+ Metadata: m,
+ Nullable: n.nullable,
+ }
+}
+
+// Avro primitive type.
+//
+// NOTE: Arrow Binary type is used as a catchall to avoid potential data loss.
+func avroPrimitiveToArrowType(avroFieldType string) arrow.DataType {
+ switch avroFieldType {
+ // int: 32-bit signed integer
+ case "int":
+ return arrow.PrimitiveTypes.Int32
+ // long: 64-bit signed integer
+ case "long":
+ return arrow.PrimitiveTypes.Int64
+ // float: single precision (32-bit) IEEE 754 floating-point number
+ case "float":
+ return arrow.PrimitiveTypes.Float32
+ // double: double precision (64-bit) IEEE 754 floating-point number
+ case "double":
+ return arrow.PrimitiveTypes.Float64
+ // bytes: sequence of 8-bit unsigned bytes
+ case "bytes":
+ return arrow.BinaryTypes.Binary
+ // boolean: a binary value
+ case "boolean":
+ return arrow.FixedWidthTypes.Boolean
+ // string: unicode character sequence
+ case "string":
+ return arrow.BinaryTypes.String
+ }
+ return nil
+}
+
+func avroLogicalToArrowField(n *schemaNode) {
+ var dt arrow.DataType
+ // Avro logical types
+ switch lt := n.schema.(avro.LogicalTypeSchema).Logical(); lt.Type() {
+ // The decimal logical type represents an arbitrary-precision signed
decimal number of the form unscaled × 10-scale.
+ // A decimal logical type annotates Avro bytes or fixed types. The byte
array must contain the two’s-complement
+ // representation of the unscaled integer value in big-endian byte
order. The scale is fixed, and is specified
+ // using an attribute.
+ //
+ // The following attributes are supported:
+ // scale, a JSON integer representing the scale (optional). If not
specified the scale is 0.
+ // precision, a JSON integer representing the (maximum) precision of
decimals stored in this type (required).
+ case "decimal":
+ id := arrow.DECIMAL128
+ if lt.(*avro.DecimalLogicalSchema).Precision() >
decimal128.MaxPrecision {
+ id = arrow.DECIMAL256
+ }
+ dt, _ = arrow.NewDecimalType(id,
int32(lt.(*avro.DecimalLogicalSchema).Precision()),
int32(lt.(*avro.DecimalLogicalSchema).Scale()))
+
+ // The uuid logical type represents a random generated
universally unique identifier (UUID).
+ // A uuid logical type annotates an Avro string. The string has
to conform with RFC-4122
+ case "uuid":
+ dt = types.NewUUIDType()
+
+ // The date logical type represents a date within the calendar, with no
reference to a particular
+ // time zone or time of day.
+ // A date logical type annotates an Avro int, where the int stores the
number of days from the unix epoch,
+ // 1 January 1970 (ISO calendar).
+ case "date":
+ dt = arrow.FixedWidthTypes.Date32
+
+ // The time-millis logical type represents a time of day, with no
reference to a particular calendar,
+ // time zone or date, with a precision of one millisecond.
+ // A time-millis logical type annotates an Avro int, where the int
stores the number of milliseconds
+ // after midnight, 00:00:00.000.
+ case "time-millis":
+ dt = arrow.FixedWidthTypes.Time32ms
+
+ // The time-micros logical type represents a time of day, with no
reference to a particular calendar,
+ // time zone or date, with a precision of one microsecond.
+ // A time-micros logical type annotates an Avro long, where the long
stores the number of microseconds
+ // after midnight, 00:00:00.000000.
+ case "time-micros":
+ dt = arrow.FixedWidthTypes.Time64us
+
+ // The timestamp-millis logical type represents an instant on the
global timeline, independent of a
+ // particular time zone or calendar, with a precision of one
millisecond. Please note that time zone
+ // information gets lost in this process. Upon reading a value back, we
can only reconstruct the instant,
+ // but not the original representation. In practice, such timestamps
are typically displayed to users in
+ // their local time zones, therefore they may be displayed differently
depending on the execution environment.
+ // A timestamp-millis logical type annotates an Avro long, where the
long stores the number of milliseconds
+ // from the unix epoch, 1 January 1970 00:00:00.000 UTC.
+ case "timestamp-millis":
+ dt = arrow.FixedWidthTypes.Timestamp_ms
+
+ // The timestamp-micros logical type represents an instant on the
global timeline, independent of a
+ // particular time zone or calendar, with a precision of one
microsecond. Please note that time zone
+ // information gets lost in this process. Upon reading a value back, we
can only reconstruct the instant,
+ // but not the original representation. In practice, such timestamps
are typically displayed to users
+ // in their local time zones, therefore they may be displayed
differently depending on the execution environment.
+ // A timestamp-micros logical type annotates an Avro long, where the
long stores the number of microseconds
+ // from the unix epoch, 1 January 1970 00:00:00.000000 UTC.
+ case "timestamp-micros":
+ dt = arrow.FixedWidthTypes.Timestamp_us
+
+ // The local-timestamp-millis logical type represents a timestamp in a
local timezone, regardless of
+ // what specific time zone is considered local, with a precision of one
millisecond.
+ // A local-timestamp-millis logical type annotates an Avro long, where
the long stores the number of
+ // milliseconds, from 1 January 1970 00:00:00.000.
+ // Note: not implemented in hamba/avro
+ // case "local-timestamp-millis":
+ // dt = &arrow.TimestampType{Unit: arrow.Millisecond}
+
+ // The local-timestamp-micros logical type represents a timestamp in a
local timezone, regardless of
+ // what specific time zone is considered local, with a precision of one
microsecond.
+ // A local-timestamp-micros logical type annotates an Avro long, where
the long stores the number of
+ // microseconds, from 1 January 1970 00:00:00.000000.
+ // case "local-timestamp-micros":
+ // Note: not implemented in hamba/avro
+ // dt = &arrow.TimestampType{Unit: arrow.Microsecond}
+
+ // The duration logical type represents an amount of time defined by a
number of months, days and milliseconds.
+ // This is not equivalent to a number of milliseconds, because,
depending on the moment in time from which the
+ // duration is measured, the number of days in the month and number of
milliseconds in a day may differ. Other
+ // standard periods such as years, quarters, hours and minutes can be
expressed through these basic periods.
+
+ // A duration logical type annotates Avro fixed type of size 12, which
stores three little-endian unsigned integers
+ // that represent durations at different granularities of time. The
first stores a number in months, the second
+ // stores a number in days, and the third stores a number in
milliseconds.
+ case "duration":
+ dt = arrow.FixedWidthTypes.MonthDayNanoInterval
+ }
+ n.arrowField = buildArrowField(n, dt, arrow.Metadata{})
+}
diff --git a/go/arrow/avro/schema_test.go b/go/arrow/avro/schema_test.go
new file mode 100644
index 0000000000..08a3fe1ed7
--- /dev/null
+++ b/go/arrow/avro/schema_test.go
@@ -0,0 +1,362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package avro
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/apache/arrow/go/v15/arrow"
+ hamba "github.com/hamba/avro/v2"
+)
+
+func TestSchemaStringEqual(t *testing.T) {
+ tests := []struct {
+ avroSchema string
+ arrowSchema []arrow.Field
+ }{
+ {
+ avroSchema: `{
+ "fields": [
+ {
+ "name": "inheritNull",
+ "type": {
+ "name": "Simple",
+ "symbols": [
+ "a",
+ "b"
+ ],
+ "type": "enum"
+ }
+ },
+ {
+ "name": "explicitNamespace",
+ "type": {
+ "name": "test",
+ "namespace":
"org.hamba.avro",
+ "size": 12,
+ "type": "fixed"
+ }
+ },
+ {
+ "name": "fullName",
+ "type": {
+ "type": "record",
+ "name": "fullName_data",
+ "namespace": "ignored",
+ "doc": "A name
attribute with a fullname, so the namespace attribute is ignored. The fullname
is 'a.full.Name', and the namespace is 'a.full'.",
+ "fields": [{
+ "name":
"inheritNamespace",
+ "type":
{
+
"type": "enum",
+
"name": "Understanding",
+
"doc": "A simple name (attribute) and no namespace attribute: inherit the
namespace of the enclosing type 'a.full.Name'. The fullname is
'a.full.Understanding'.",
+
"symbols": ["d", "e"]
+ }
+ }, {
+ "name":
"md5",
+ "type":
{
+ "name": "md5_data",
+ "type": "fixed",
+
"size": 16,
+
"namespace": "ignored"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "id",
+ "type": "int"
+ },
+ {
+ "name": "bigId",
+ "type": "long"
+ },
+ {
+ "name": "temperature",
+ "type": [
+ "null",
+ "float"
+ ]
+ },
+ {
+ "name": "fraction",
+ "type": [
+ "null",
+ "double"
+ ]
+ },
+ {
+ "name": "is_emergency",
+ "type": "boolean"
+ },
+ {
+ "name": "remote_ip",
+ "type": [
+ "null",
+ "bytes"
+ ]
+ },
+ {
+ "name": "person",
+ "type": {
+ "fields": [
+ {
+ "name":
"lastname",
+ "type":
"string"
+ },
+ {
+ "name":
"address",
+ "type":
{
+
"fields": [
+
{
+
"name": "streetaddress",
+
"type": "string"
+
},
+
{
+
"name": "city",
+
"type": "string"
+
}
+
],
+
"name": "AddressUSRecord",
+
"type": "record"
+ }
+ },
+ {
+ "name":
"mapfield",
+ "type":
{
+
"default": {
+
},
+
"type": "map",
+
"values": "long"
+ }
+ },
+ {
+ "name":
"arrayField",
+ "type":
{
+
"default": [
+
],
+
"items": "string",
+
"type": "array"
+ }
+ }
+ ],
+ "name": "person_data",
+ "type": "record"
+ }
+ },
+ {
+ "name": "decimalField",
+ "type": {
+ "logicalType":
"decimal",
+ "precision": 4,
+ "scale": 2,
+ "type": "bytes"
+ }
+ },
+ {
+ "logicalType": "uuid",
+ "name": "uuidField",
+ "type": "string"
+ },
+ {
+ "name": "timemillis",
+ "type": {
+ "type": "int",
+ "logicalType":
"time-millis"
+ }
+ },
+ {
+ "name": "timemicros",
+ "type": {
+ "type": "long",
+ "logicalType":
"time-micros"
+ }
+ },
+ {
+ "name": "timestampmillis",
+ "type": {
+ "type": "long",
+ "logicalType":
"timestamp-millis"
+ }
+ },
+ {
+ "name": "timestampmicros",
+ "type": {
+ "type": "long",
+ "logicalType":
"timestamp-micros"
+ }
+ },
+ {
+ "name": "duration",
+ "type": {
+ "name": "duration",
+ "namespace": "whyowhy",
+ "logicalType":
"duration",
+ "size": 12,
+ "type": "fixed"
+ }
+ },
+ {
+ "name": "date",
+ "type": {
+ "logicalType": "date",
+ "type": "int"
+ }
+ }
+ ],
+ "name": "Example",
+ "type": "record"
+ }`,
+ arrowSchema: []arrow.Field{
+ {
+ Name: "inheritNull",
+ Type:
&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8, ValueType:
arrow.BinaryTypes.String, Ordered: false},
+ Metadata:
arrow.MetadataFrom(map[string]string{"0": "a", "1": "b"}),
+ },
+ {
+ Name: "explicitNamespace",
+ Type:
&arrow.FixedSizeBinaryType{ByteWidth: 12},
+ },
+ {
+ Name: "fullName",
+ Type: arrow.StructOf(
+ arrow.Field{
+ Name:
"inheritNamespace",
+ Type:
&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8, ValueType:
arrow.BinaryTypes.String, Ordered: false},
+ },
+ arrow.Field{
+ Name: "md5",
+ Type:
&arrow.FixedSizeBinaryType{ByteWidth: 16},
+ },
+ ),
+ },
+ {
+ Name: "id",
+ Type: arrow.PrimitiveTypes.Int32,
+ },
+ {
+ Name: "bigId",
+ Type: arrow.PrimitiveTypes.Int64,
+ },
+ {
+ Name: "temperature",
+ Type: arrow.PrimitiveTypes.Float32,
+ Nullable: true,
+ },
+ {
+ Name: "fraction",
+ Type: arrow.PrimitiveTypes.Float64,
+ Nullable: true,
+ },
+ {
+ Name: "is_emergency",
+ Type: arrow.FixedWidthTypes.Boolean,
+ },
+ {
+ Name: "remote_ip",
+ Type: arrow.BinaryTypes.Binary,
+ Nullable: true,
+ },
+ {
+ Name: "person",
+ Type: arrow.StructOf(
+ arrow.Field{
+ Name: "lastname",
+ Type:
arrow.BinaryTypes.String,
+ Nullable: true,
+ },
+ arrow.Field{
+ Name: "address",
+ Type: arrow.StructOf(
+ arrow.Field{
+ Name:
"streetaddress",
+ Type:
arrow.BinaryTypes.String,
+ },
+ arrow.Field{
+ Name:
"city",
+ Type:
arrow.BinaryTypes.String,
+ },
+ ),
+ },
+ arrow.Field{
+ Name: "mapfield",
+ Type:
arrow.MapOf(arrow.BinaryTypes.String, arrow.PrimitiveTypes.Int64),
+ Nullable: true,
+ },
+ arrow.Field{
+ Name: "arrayField",
+ Type:
arrow.ListOfNonNullable(arrow.BinaryTypes.String),
+ },
+ ),
+ },
+ {
+ Name: "decimalField",
+ Type: &arrow.Decimal128Type{Precision:
4, Scale: 2},
+ },
+ {
+ Name: "uuidField",
+ Type: arrow.BinaryTypes.String,
+ },
+ {
+ Name: "timemillis",
+ Type: arrow.FixedWidthTypes.Time32ms,
+ },
+ {
+ Name: "timemicros",
+ Type: arrow.FixedWidthTypes.Time64us,
+ },
+ {
+ Name: "timestampmillis",
+ Type:
arrow.FixedWidthTypes.Timestamp_ms,
+ },
+ {
+ Name: "timestampmicros",
+ Type:
arrow.FixedWidthTypes.Timestamp_us,
+ },
+ {
+ Name: "duration",
+ Type:
arrow.FixedWidthTypes.MonthDayNanoInterval,
+ },
+ {
+ Name: "date",
+ Type: arrow.FixedWidthTypes.Date32,
+ },
+ },
+ },
+ }
+
+ for _, test := range tests {
+ t.Run("", func(t *testing.T) {
+ want := arrow.NewSchema(test.arrowSchema, nil)
+ schema, err := hamba.ParseBytes([]byte(test.avroSchema))
+ if err != nil {
+ t.Fatalf("%v", err)
+ }
+ got, err := ArrowSchemaFromAvro(schema)
+ if err != nil {
+ t.Fatalf("%v", err)
+ }
+ if !(fmt.Sprintf("%+v", want.String()) ==
fmt.Sprintf("%+v", got.String())) {
+ t.Fatalf("got=%v,\n want=%v", got.String(),
want.String())
+ } else {
+ t.Logf("schema.String() comparison passed")
+ }
+ })
+ }
+}
diff --git a/go/arrow/avro/testdata/arrayrecordmap.avro
b/go/arrow/avro/testdata/arrayrecordmap.avro
new file mode 100644
index 0000000000..84a8b59b42
Binary files /dev/null and b/go/arrow/avro/testdata/arrayrecordmap.avro differ
diff --git a/go/arrow/avro/testdata/githubsamplecommits.avro
b/go/arrow/avro/testdata/githubsamplecommits.avro
new file mode 100644
index 0000000000..f16d17d29e
Binary files /dev/null and b/go/arrow/avro/testdata/githubsamplecommits.avro
differ
diff --git a/go/go.mod b/go/go.mod
index a6c2af7025..73a1cb7e77 100644
--- a/go/go.mod
+++ b/go/go.mod
@@ -47,7 +47,9 @@ require (
require (
github.com/google/uuid v1.3.1
+ github.com/hamba/avro/v2 v2.17.2
github.com/substrait-io/substrait-go v0.4.2
+ github.com/tidwall/sjson v1.2.5
)
require (
@@ -57,14 +59,21 @@ require (
github.com/fatih/color v1.15.0 // indirect
github.com/goccy/go-yaml v1.11.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
+ github.com/json-iterator/go v1.1.12 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 //
indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
+ github.com/mitchellh/mapstructure v1.5.0 // indirect
+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd //
indirect
+ github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec //
indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
+ github.com/tidwall/gjson v1.14.2 // indirect
+ github.com/tidwall/match v1.1.1 // indirect
+ github.com/tidwall/pretty v1.2.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/text v0.13.0 // indirect
diff --git a/go/go.sum b/go/go.sum
index bdd499c3f5..2c1edd59e0 100644
--- a/go/go.sum
+++ b/go/go.sum
@@ -34,10 +34,15 @@ github.com/google/flatbuffers v23.5.26+incompatible
h1:M9dgRyhJemaM4Sw8+66GHBu8i
github.com/google/flatbuffers v23.5.26+incompatible/go.mod
h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.5.5/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/gofuzz v1.0.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26
h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/hamba/avro/v2 v2.17.2
h1:6PKpEWzJfNnvBgn7m2/8WYaDOUASxfDU+Jyb4ojDgFY=
+github.com/hamba/avro/v2 v2.17.2/go.mod
h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
github.com/hexops/gotextdiff v1.0.3
h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
+github.com/json-iterator/go v1.1.12
h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod
h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod
h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/klauspost/asmfmt v1.3.2
h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
@@ -60,6 +65,13 @@ github.com/minio/asm2plan9s
v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpsp
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod
h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3
h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod
h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
+github.com/mitchellh/mapstructure v1.5.0
h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
+github.com/mitchellh/mapstructure v1.5.0/go.mod
h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.2
h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod
h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pierrec/lz4/v4 v4.1.18
h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod
h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -73,12 +85,21 @@ github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod
h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/testify v1.3.0/go.mod
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4
h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/substrait-io/substrait-go v0.4.2
h1:buDnjsb3qAqTaNbOR7VKmNgXf4lYQxWEcnSGUWBtmN8=
github.com/substrait-io/substrait-go v0.4.2/go.mod
h1:qhpnLmrcvAnlZsUyPXZRqldiHapPTXC3t7xFgDi3aQg=
+github.com/tidwall/gjson v1.14.2
h1:6BBkirS0rAHjumnjHF6qgy5d2YAJ1TLIaFE2lzfOLqo=
+github.com/tidwall/gjson v1.14.2/go.mod
h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
+github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
+github.com/tidwall/match v1.1.1/go.mod
h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
+github.com/tidwall/pretty v1.2.0
h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
+github.com/tidwall/pretty v1.2.0/go.mod
h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
+github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
+github.com/tidwall/sjson v1.2.5/go.mod
h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod
h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=