This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new de5aa54  ARROW-14717: [Go] Use the ipc.Reader allocator in 
messageReader
de5aa54 is described below

commit de5aa544275c9664d17874b1b93de195fb619825
Author: Chris Casola <[email protected]>
AuthorDate: Thu Nov 18 15:06:36 2021 -0500

    ARROW-14717: [Go] Use the ipc.Reader allocator in messageReader
    
    Allocate the body of messages from the memory.Allocator instead
    of using make.
    
    Closes #11712 from chriscasola/ccasola--jira-14717
    
    Authored-by: Chris Casola <[email protected]>
    Signed-off-by: Matthew Topol <[email protected]>
---
 go/arrow/internal/arrdata/ioutil.go  |   1 +
 go/arrow/ipc/file_reader.go          |  40 ++++++++++----
 go/arrow/ipc/message.go              |  19 +++++--
 go/arrow/ipc/message_test.go         | 102 +++++++++++++++++++++++++++++++++++
 go/arrow/ipc/reader.go               |   4 +-
 go/arrow/memory/checked_allocator.go |  21 ++++----
 6 files changed, 162 insertions(+), 25 deletions(-)

diff --git a/go/arrow/internal/arrdata/ioutil.go 
b/go/arrow/internal/arrdata/ioutil.go
index a7e3e41..c6a2a39 100644
--- a/go/arrow/internal/arrdata/ioutil.go
+++ b/go/arrow/internal/arrdata/ioutil.go
@@ -85,6 +85,7 @@ func CheckArrowConcurrentFile(t *testing.T, f *os.File, mem 
memory.Allocator, sc
                        errs <- fmt.Errorf("could not read record %d: %v", i, 
err)
                        return
                }
+               defer rec.Release()
                if !array.RecordEqual(rec, recs[i]) {
                        errs <- fmt.Errorf("records[%d] differ", i)
                }
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 78d8067..b258199 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -47,6 +47,8 @@ type FileReader struct {
 
        irec int   // current record index. used for the arrio.Reader interface
        err  error // last error
+
+       mem memory.Allocator
 }
 
 // NewFileReader opens an Arrow file using the provided reader r.
@@ -59,6 +61,7 @@ func NewFileReader(r ReadAtSeeker, opts ...Option) 
(*FileReader, error) {
                        r:      r,
                        fields: make(dictTypeMap),
                        memo:   newMemo(),
+                       mem:    cfg.alloc,
                }
        )
 
@@ -288,7 +291,7 @@ func (f *FileReader) RecordAt(i int) (array.Record, error) {
                return nil, xerrors.Errorf("arrow/ipc: message %d is not a 
Record", i)
        }
 
-       return newRecord(f.schema, msg.meta, 
bytes.NewReader(msg.body.Bytes())), nil
+       return newRecord(f.schema, msg.meta, bytes.NewReader(msg.body.Bytes()), 
f.mem), nil
 }
 
 // Read reads the current record from the underlying stream and an error, if 
any.
@@ -310,7 +313,7 @@ func (f *FileReader) ReadAt(i int64) (array.Record, error) {
        return f.Record(int(i))
 }
 
-func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) 
array.Record {
+func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker, 
mem memory.Allocator) array.Record {
        var (
                msg   = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
                md    flatbuf.RecordBatch
@@ -329,6 +332,7 @@ func newRecord(schema *arrow.Schema, meta *memory.Buffer, 
body ReadAtSeeker) arr
                        meta:  &md,
                        r:     body,
                        codec: codec,
+                       mem:   mem,
                },
                max: kMaxNestingDepth,
        }
@@ -336,6 +340,7 @@ func newRecord(schema *arrow.Schema, meta *memory.Buffer, 
body ReadAtSeeker) arr
        cols := make([]array.Interface, len(schema.Fields()))
        for i, field := range schema.Fields() {
                cols[i] = ctx.loadArray(field.Type)
+               defer cols[i].Release()
        }
 
        return array.NewRecord(schema, cols, rows)
@@ -345,6 +350,7 @@ type ipcSource struct {
        meta  *flatbuf.RecordBatch
        r     ReadAtSeeker
        codec decompressor
+       mem   memory.Allocator
 }
 
 func (src *ipcSource) buffer(i int) *memory.Buffer {
@@ -356,10 +362,10 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
                return memory.NewBufferBytes(nil)
        }
 
-       var raw []byte
+       raw := memory.NewResizableBuffer(src.mem)
        if src.codec == nil {
-               raw = make([]byte, buf.Length())
-               _, err := src.r.ReadAt(raw, buf.Offset())
+               raw.Resize(int(buf.Length()))
+               _, err := src.r.ReadAt(raw.Bytes(), buf.Offset())
                if err != nil {
                        panic(err)
                }
@@ -375,19 +381,19 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
                var r io.Reader = sr
                // check for an uncompressed buffer
                if int64(uncompressedSize) != -1 {
-                       raw = make([]byte, uncompressedSize)
+                       raw.Resize(int(uncompressedSize))
                        src.codec.Reset(sr)
                        r = src.codec
                } else {
-                       raw = make([]byte, buf.Length())
+                       raw.Resize(int(buf.Length()))
                }
 
-               if _, err = io.ReadFull(r, raw); err != nil {
+               if _, err = io.ReadFull(r, raw.Bytes()); err != nil {
                        panic(err)
                }
        }
 
-       return memory.NewBufferBytes(raw)
+       return raw
 }
 
 func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode {
@@ -507,6 +513,8 @@ func (ctx *arrayLoaderContext) loadPrimitive(dt 
arrow.DataType) array.Interface
                buffers = append(buffers, ctx.buffer())
        }
 
+       defer releaseBuffers(buffers)
+
        data := array.NewData(dt, int(field.Length()), buffers, nil, 
int(field.NullCount()), 0)
        defer data.Release()
 
@@ -516,6 +524,7 @@ func (ctx *arrayLoaderContext) loadPrimitive(dt 
arrow.DataType) array.Interface
 func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) array.Interface {
        field, buffers := ctx.loadCommon(3)
        buffers = append(buffers, ctx.buffer(), ctx.buffer())
+       defer releaseBuffers(buffers)
 
        data := array.NewData(dt, int(field.Length()), buffers, nil, 
int(field.NullCount()), 0)
        defer data.Release()
@@ -526,6 +535,7 @@ func (ctx *arrayLoaderContext) loadBinary(dt 
arrow.DataType) array.Interface {
 func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt 
*arrow.FixedSizeBinaryType) array.Interface {
        field, buffers := ctx.loadCommon(2)
        buffers = append(buffers, ctx.buffer())
+       defer releaseBuffers(buffers)
 
        data := array.NewData(dt, int(field.Length()), buffers, nil, 
int(field.NullCount()), 0)
        defer data.Release()
@@ -536,6 +546,7 @@ func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt 
*arrow.FixedSizeBinaryType
 func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) array.Interface {
        field, buffers := ctx.loadCommon(2)
        buffers = append(buffers, ctx.buffer())
+       defer releaseBuffers(buffers)
 
        sub := ctx.loadChild(dt.ValueType())
        defer sub.Release()
@@ -549,6 +560,7 @@ func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) 
array.Interface {
 func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) array.Interface {
        field, buffers := ctx.loadCommon(2)
        buffers = append(buffers, ctx.buffer())
+       defer releaseBuffers(buffers)
 
        sub := ctx.loadChild(dt.Elem())
        defer sub.Release()
@@ -561,6 +573,7 @@ func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) 
array.Interface {
 
 func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType) 
array.Interface {
        field, buffers := ctx.loadCommon(1)
+       defer releaseBuffers(buffers)
 
        sub := ctx.loadChild(dt.Elem())
        defer sub.Release()
@@ -573,6 +586,7 @@ func (ctx *arrayLoaderContext) loadFixedSizeList(dt 
*arrow.FixedSizeListType) ar
 
 func (ctx *arrayLoaderContext) loadStruct(dt *arrow.StructType) 
array.Interface {
        field, buffers := ctx.loadCommon(1)
+       defer releaseBuffers(buffers)
 
        arrs := make([]array.Interface, len(dt.Fields()))
        subs := make([]*array.Data, len(dt.Fields()))
@@ -634,3 +648,11 @@ func readDictionary(meta *memory.Buffer, types 
dictTypeMap, r ReadAtSeeker) (int
 
        panic("not implemented")
 }
+
+func releaseBuffers(buffers []*memory.Buffer) {
+       for _, b := range buffers {
+               if b != nil {
+                       b.Release()
+               }
+       }
+}
diff --git a/go/arrow/ipc/message.go b/go/arrow/ipc/message.go
index 2eda586..acaddc2 100644
--- a/go/arrow/ipc/message.go
+++ b/go/arrow/ipc/message.go
@@ -154,11 +154,18 @@ type messageReader struct {
 
        refCount int64
        msg      *Message
+
+       mem memory.Allocator
 }
 
 // NewMessageReader returns a reader that reads messages from an input stream.
-func NewMessageReader(r io.Reader) MessageReader {
-       return &messageReader{r: r, refCount: 1}
+func NewMessageReader(r io.Reader, opts ...Option) MessageReader {
+       cfg := newConfig()
+       for _, opt := range opts {
+               opt(cfg)
+       }
+
+       return &messageReader{r: r, refCount: 1, mem: cfg.alloc}
 }
 
 // Retain increases the reference count by 1.
@@ -224,12 +231,14 @@ func (r *messageReader) Message() (*Message, error) {
        meta := flatbuf.GetRootAsMessage(buf, 0)
        bodyLen := meta.BodyLength()
 
-       buf = make([]byte, bodyLen)
-       _, err = io.ReadFull(r.r, buf)
+       body := memory.NewResizableBuffer(r.mem)
+       defer body.Release()
+       body.Resize(int(bodyLen))
+
+       _, err = io.ReadFull(r.r, body.Bytes())
        if err != nil {
                return nil, xerrors.Errorf("arrow/ipc: could not read message 
body: %w", err)
        }
-       body := memory.NewBufferBytes(buf)
 
        if r.msg != nil {
                r.msg.Release()
diff --git a/go/arrow/ipc/message_test.go b/go/arrow/ipc/message_test.go
new file mode 100644
index 0000000..22a6873
--- /dev/null
+++ b/go/arrow/ipc/message_test.go
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ipc
+
+import (
+       "bytes"
+       "io"
+       "testing"
+
+       "github.com/apache/arrow/go/v7/arrow"
+       "github.com/apache/arrow/go/v7/arrow/array"
+       "github.com/apache/arrow/go/v7/arrow/memory"
+)
+
+func TestMessageReaderBodyInAllocator(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+       defer mem.AssertSize(t, 0)
+
+       const numRecords = 3
+       buf := writeRecordsIntoBuffer(t, numRecords)
+       r := NewMessageReader(buf, WithAllocator(mem))
+       defer r.Release()
+
+       msgs := make([]*Message, 0)
+       for {
+               m, err := r.Message()
+               if err == io.EOF {
+                       break
+               }
+               if err != nil {
+                       t.Fatal(err)
+               }
+               m.Retain()
+               msgs = append(msgs, m)
+       }
+       if len(msgs) != numRecords+1 {
+               t.Fatalf("expected %d messages but got %d", numRecords+1, 
len(msgs))
+       }
+
+       if mem.CurrentAlloc() <= 0 {
+               t.Fatal("message bodies should have been allocated")
+       }
+
+       for _, m := range msgs {
+               m.Release()
+       }
+}
+
+func writeRecordsIntoBuffer(t *testing.T, numRecords int) *bytes.Buffer {
+       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+       defer mem.AssertSize(t, 0)
+
+       s, recs := getTestRecords(mem, numRecords)
+       buf := new(bytes.Buffer)
+       w := NewWriter(buf, WithAllocator(mem), WithSchema(s))
+       for _, rec := range recs {
+               err := w.Write(rec)
+               rec.Release()
+               if err != nil {
+                       t.Fatal(err)
+               }
+       }
+       if err := w.Close(); err != nil {
+               t.Fatal(err)
+       }
+       return buf
+}
+
+func getTestRecords(mem memory.Allocator, numRecords int) (*arrow.Schema, 
[]array.Record) {
+       meta := arrow.NewMetadata([]string{}, []string{})
+       s := arrow.NewSchema([]arrow.Field{
+               {Name: "test-col", Type: arrow.PrimitiveTypes.Int64},
+       }, &meta)
+
+       builder := array.NewRecordBuilder(mem, s)
+       defer builder.Release()
+
+       recs := make([]array.Record, numRecords)
+       for i := 0; i < len(recs); i++ {
+               col := builder.Field(0).(*array.Int64Builder)
+               for i := 0; i < 10; i++ {
+                       col.Append(int64(i))
+               }
+               recs[i] = builder.NewRecord()
+       }
+
+       return s, recs
+}
diff --git a/go/arrow/ipc/reader.go b/go/arrow/ipc/reader.go
index 42572b7..1ccfac1 100644
--- a/go/arrow/ipc/reader.go
+++ b/go/arrow/ipc/reader.go
@@ -75,7 +75,7 @@ func NewReaderFromMessageReader(r MessageReader, opts 
...Option) (*Reader, error
 
 // NewReader returns a reader that reads records from an input stream.
 func NewReader(r io.Reader, opts ...Option) (*Reader, error) {
-       return NewReaderFromMessageReader(NewMessageReader(r), opts...)
+       return NewReaderFromMessageReader(NewMessageReader(r, opts...), opts...)
 }
 
 // Err returns the last error encountered during the iteration over the
@@ -176,7 +176,7 @@ func (r *Reader) next() bool {
                return false
        }
 
-       r.rec = newRecord(r.schema, msg.meta, bytes.NewReader(msg.body.Bytes()))
+       r.rec = newRecord(r.schema, msg.meta, 
bytes.NewReader(msg.body.Bytes()), r.mem)
        return true
 }
 
diff --git a/go/arrow/memory/checked_allocator.go 
b/go/arrow/memory/checked_allocator.go
index da300ae..06be9bd 100644
--- a/go/arrow/memory/checked_allocator.go
+++ b/go/arrow/memory/checked_allocator.go
@@ -21,12 +21,13 @@ import (
        "runtime"
        "strconv"
        "sync"
+       "sync/atomic"
        "unsafe"
 )
 
 type CheckedAllocator struct {
        mem Allocator
-       sz  int
+       sz  int64
 
        allocs sync.Map
 }
@@ -35,10 +36,10 @@ func NewCheckedAllocator(mem Allocator) *CheckedAllocator {
        return &CheckedAllocator{mem: mem}
 }
 
-func (a *CheckedAllocator) CurrentAlloc() int { return a.sz }
+func (a *CheckedAllocator) CurrentAlloc() int { return 
int(atomic.LoadInt64(&a.sz)) }
 
 func (a *CheckedAllocator) Allocate(size int) []byte {
-       a.sz += size
+       atomic.AddInt64(&a.sz, int64(size))
        out := a.mem.Allocate(size)
        if size == 0 {
                return out
@@ -52,7 +53,7 @@ func (a *CheckedAllocator) Allocate(size int) []byte {
 }
 
 func (a *CheckedAllocator) Reallocate(size int, b []byte) []byte {
-       a.sz += size - len(b)
+       atomic.AddInt64(&a.sz, int64(size-len(b)))
 
        oldptr := uintptr(unsafe.Pointer(&b[0]))
        out := a.mem.Reallocate(size, b)
@@ -69,7 +70,7 @@ func (a *CheckedAllocator) Reallocate(size int, b []byte) 
[]byte {
 }
 
 func (a *CheckedAllocator) Free(b []byte) {
-       a.sz -= len(b)
+       atomic.AddInt64(&a.sz, int64(len(b)*-1))
        defer a.mem.Free(b)
 
        if len(b) == 0 {
@@ -127,7 +128,7 @@ func (a *CheckedAllocator) AssertSize(t TestingT, sz int) {
                return true
        })
 
-       if a.sz != sz {
+       if int(atomic.LoadInt64(&a.sz)) != sz {
                t.Helper()
                t.Errorf("invalid memory size exp=%d, got=%d", sz, a.sz)
        }
@@ -139,13 +140,15 @@ type CheckedAllocatorScope struct {
 }
 
 func NewCheckedAllocatorScope(alloc *CheckedAllocator) *CheckedAllocatorScope {
-       return &CheckedAllocatorScope{alloc: alloc, sz: alloc.sz}
+       sz := atomic.LoadInt64(&alloc.sz)
+       return &CheckedAllocatorScope{alloc: alloc, sz: int(sz)}
 }
 
 func (c *CheckedAllocatorScope) CheckSize(t TestingT) {
-       if c.sz != c.alloc.sz {
+       sz := int(atomic.LoadInt64(&c.alloc.sz))
+       if c.sz != sz {
                t.Helper()
-               t.Errorf("invalid memory size exp=%d, got=%d", c.sz, c.alloc.sz)
+               t.Errorf("invalid memory size exp=%d, got=%d", c.sz, sz)
        }
 }
 

Reply via email to