This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new a9b0be43 Batch of small optimizations (#556)
a9b0be43 is described below

commit a9b0be4305d4e16df2d315ac587dbe9f61d425b7
Author: pixelherodev <[email protected]>
AuthorDate: Fri Oct 31 09:29:35 2025 -0500

    Batch of small optimizations (#556)
    
    This primarily consists of reducing allocations.
    
    e.g. 03be6d3f adds a specific case to TypeEqual for ListView types,
    which avoids reflect.DeepEqual. l.elem.Equal does not allocate;
    reflect.DeepEqual does, so this reduces allocations when comparing a
    ListViewType.
    
    8c7729fc changes from referencing flatbug internal types by reference to
    by-value, which avoids letting them escape to the heap.
    
    e3406582 switches from reflect.DeepEqual to slices.Equal when comparing
    the child IDs of a Union type, which is functionally equivalent for
    slices and, again, does not allocate.
    
    9eba6d2c does similar for an arrowflight cookie test, replacing
    reflect.DeepEquals with maps.Equal; I did a grep for reflect.DeepEqual
    and noticed it.
    
    7557d15c is probably the most objectionable; it replaces a *float64 with
    float64 directly for minSpaceSavings, since the current nil value has
    identical behavior to zero anyways. It's a tiny cleanup IMO, but doesn't
    really have any practical value.
    
    2d3593f6 is a bit ugly; it inlines a copy of NewReaderFromMessageReader
    and a copy of NewMessageReader into NewReader - by implementing the two
    directly, it's able to avoid allocating two copies of the `config`
    structure, and to avoid looping over and invoking config functions
    twice. It's kinda ugly though, and the `// types: make(dictTypeMap)`
    which is copied over suggests that there's further room for improvement.
    
    I didn't drop the two APIs, as they are exposed to users, but IMO the
    performance win is probably worth it, and it can be cleaned up later if
    needed.
    
    0273ffe8 is a small optimization to the allocator: the Go allocator
    banches on each allocation, but both branches are functionally
    equivalent, so I just merged the two. I doubt this would have a _major_
    impact, since the cost of a branch is negligible compared to the cost of
    a heap allocation, but performance is often won with a lot of small
    impacts IMO :)
    
    Lastly, for now at least, f1a6a331 statically allocates 4 bytes per IPC
    messageReader (I did _not_ touch the ArrowFlight implementation), and
    uses it on every message read, rather than heap-allocating 4 bytes each
    time Message is called.
    
    
    There's more to be done, but this should be a much more practically
    sized chunk to review, and I can add more later :)
    
    ---------
    
    Co-authored-by: Noam Preil <[email protected]>
---
 arrow/compare.go                       |  5 ++++-
 arrow/flight/basic_auth_flight_test.go | 10 ++++++++--
 arrow/flight/cookie_middleware_test.go |  4 ++--
 arrow/ipc/file_reader.go               |  8 ++++----
 arrow/ipc/file_writer.go               |  2 +-
 arrow/ipc/ipc.go                       |  4 ++--
 arrow/ipc/message.go                   |  5 +++--
 arrow/ipc/reader.go                    | 28 ++++++++++++++++++++++++++--
 arrow/ipc/writer.go                    | 21 +++++++++------------
 arrow/ipc/writer_test.go               |  8 ++++----
 arrow/memory/go_allocator.go           |  7 ++-----
 11 files changed, 65 insertions(+), 37 deletions(-)

diff --git a/arrow/compare.go b/arrow/compare.go
index 8966f16c..644de8e1 100644
--- a/arrow/compare.go
+++ b/arrow/compare.go
@@ -18,6 +18,7 @@ package arrow
 
 import (
        "reflect"
+       "slices"
 )
 
 type typeEqualsConfig struct {
@@ -122,7 +123,7 @@ func TypeEqual(left, right DataType, opts 
...TypeEqualOption) bool {
                        return false
                }
 
-               if !reflect.DeepEqual(l.ChildIDs(), r.ChildIDs()) {
+               if !slices.Equal(l.ChildIDs(), r.ChildIDs()) {
                        return false
                }
 
@@ -149,6 +150,8 @@ func TypeEqual(left, right DataType, opts 
...TypeEqualOption) bool {
                r := right.(*RunEndEncodedType)
                return TypeEqual(l.Encoded(), r.Encoded(), opts...) &&
                        TypeEqual(l.runEnds, r.runEnds, opts...)
+       case *ListViewType:
+               return l.elem.Equal(right.(*ListViewType).elem)
        default:
                return reflect.DeepEqual(left, right)
        }
diff --git a/arrow/flight/basic_auth_flight_test.go 
b/arrow/flight/basic_auth_flight_test.go
index 6b59f998..849b25d8 100644
--- a/arrow/flight/basic_auth_flight_test.go
+++ b/arrow/flight/basic_auth_flight_test.go
@@ -73,11 +73,17 @@ func (*validator) IsValid(bearerToken string) (interface{}, 
error) {
 func TestErrorAuths(t *testing.T) {
        unary, stream := 
flight.CreateServerBearerTokenAuthInterceptors(&validator{})
        s := flight.NewFlightServer(grpc.UnaryInterceptor(unary), 
grpc.StreamInterceptor(stream))
-       s.Init("localhost:0")
+       if err := s.Init("localhost:0"); err != nil {
+               panic(err)
+       }
        f := &HeaderAuthTestFlight{}
        s.RegisterFlightService(f)
 
-       go s.Serve()
+       go func() {
+               if err := s.Serve(); err != nil {
+                       panic(err)
+               }
+       }()
        defer s.Shutdown()
 
        client, err := flight.NewFlightClient(s.Addr().String(), nil, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
diff --git a/arrow/flight/cookie_middleware_test.go 
b/arrow/flight/cookie_middleware_test.go
index 86605e66..7f0ce32a 100644
--- a/arrow/flight/cookie_middleware_test.go
+++ b/arrow/flight/cookie_middleware_test.go
@@ -21,9 +21,9 @@ import (
        "errors"
        "fmt"
        "io"
+       "maps"
        "net/http"
        "net/textproto"
-       "reflect"
        "strings"
        "testing"
        "time"
@@ -85,7 +85,7 @@ func (s *serverAddCookieMiddleware) StartCall(ctx 
context.Context) context.Conte
                }
        }
 
-       if !reflect.DeepEqual(s.expectedCookies, got) {
+       if !maps.Equal(s.expectedCookies, got) {
                panic(fmt.Sprintf("did not get expected cookies, expected %+v, 
got %+v", s.expectedCookies, got))
        }
 
diff --git a/arrow/ipc/file_reader.go b/arrow/ipc/file_reader.go
index 3c4b1b7e..d742d852 100644
--- a/arrow/ipc/file_reader.go
+++ b/arrow/ipc/file_reader.go
@@ -531,12 +531,12 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
        return raw
 }
 
-func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode {
+func (src *ipcSource) fieldMetadata(i int) flatbuf.FieldNode {
        var node flatbuf.FieldNode
        if !src.meta.Nodes(&node, i) {
                panic("arrow/ipc: field metadata out of bound")
        }
-       return &node
+       return node
 }
 
 func (src *ipcSource) variadicCount(i int) int64 {
@@ -553,7 +553,7 @@ type arrayLoaderContext struct {
        version   MetadataVersion
 }
 
-func (ctx *arrayLoaderContext) field() *flatbuf.FieldNode {
+func (ctx *arrayLoaderContext) field() flatbuf.FieldNode {
        field := ctx.src.fieldMetadata(ctx.ifield)
        ctx.ifield++
        return field
@@ -647,7 +647,7 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) 
arrow.ArrayData {
        }
 }
 
-func (ctx *arrayLoaderContext) loadCommon(typ arrow.Type, nbufs int) 
(*flatbuf.FieldNode, []*memory.Buffer) {
+func (ctx *arrayLoaderContext) loadCommon(typ arrow.Type, nbufs int) 
(flatbuf.FieldNode, []*memory.Buffer) {
        buffers := make([]*memory.Buffer, 0, nbufs)
        field := ctx.field()
 
diff --git a/arrow/ipc/file_writer.go b/arrow/ipc/file_writer.go
index ff043ec9..5b1320ba 100644
--- a/arrow/ipc/file_writer.go
+++ b/arrow/ipc/file_writer.go
@@ -247,7 +247,7 @@ type FileWriter struct {
        codec           flatbuf.CompressionType
        compressNP      int
        compressors     []compressor
-       minSpaceSavings *float64
+       minSpaceSavings float64
 
        // map of the last written dictionaries by id
        // so we can avoid writing the same dictionary over and over
diff --git a/arrow/ipc/ipc.go b/arrow/ipc/ipc.go
index c4589da6..ca2e1ecd 100644
--- a/arrow/ipc/ipc.go
+++ b/arrow/ipc/ipc.go
@@ -71,7 +71,7 @@ type config struct {
        ensureNativeEndian bool
        noAutoSchema       bool
        emitDictDeltas     bool
-       minSpaceSavings    *float64
+       minSpaceSavings    float64
 }
 
 func newConfig(opts ...Option) *config {
@@ -189,7 +189,7 @@ func WithDictionaryDeltas(v bool) Option {
 // Go and C++ versions prior to 12.0.0.
 func WithMinSpaceSavings(savings float64) Option {
        return func(cfg *config) {
-               cfg.minSpaceSavings = &savings
+               cfg.minSpaceSavings = savings
        }
 }
 
diff --git a/arrow/ipc/message.go b/arrow/ipc/message.go
index c96869ec..a2c4e370 100644
--- a/arrow/ipc/message.go
+++ b/arrow/ipc/message.go
@@ -149,7 +149,8 @@ type messageReader struct {
        refCount atomic.Int64
        msg      *Message
 
-       mem memory.Allocator
+       mem    memory.Allocator
+       header [4]byte
 }
 
 // NewMessageReader returns a reader that reads messages from an input stream.
@@ -188,7 +189,7 @@ func (r *messageReader) Release() {
 // underlying stream.
 // It is valid until the next call to Message.
 func (r *messageReader) Message() (*Message, error) {
-       buf := make([]byte, 4)
+       buf := r.header[:]
        _, err := io.ReadFull(r.r, buf)
        if err != nil {
                return nil, fmt.Errorf("arrow/ipc: could not read continuation 
indicator: %w", err)
diff --git a/arrow/ipc/reader.go b/arrow/ipc/reader.go
index 9d7096c0..df9e5000 100644
--- a/arrow/ipc/reader.go
+++ b/arrow/ipc/reader.go
@@ -89,8 +89,32 @@ func NewReaderFromMessageReader(r MessageReader, opts 
...Option) (reader *Reader
 }
 
 // NewReader returns a reader that reads records from an input stream.
-func NewReader(r io.Reader, opts ...Option) (*Reader, error) {
-       return NewReaderFromMessageReader(NewMessageReader(r, opts...), opts...)
+func NewReader(r io.Reader, opts ...Option) (rr *Reader, err error) {
+       defer func() {
+               if pErr := recover(); pErr != nil {
+                       err = utils.FormatRecoveredError("arrow/ipc: unknown 
error while reading", pErr)
+               }
+       }()
+       cfg := newConfig(opts...)
+       mr := &messageReader{r: r, mem: cfg.alloc}
+       mr.refCount.Add(1)
+       rr = &Reader{
+               r:        mr,
+               refCount: atomic.Int64{},
+               // types:    make(dictTypeMap),
+               memo:               dictutils.NewMemo(),
+               mem:                cfg.alloc,
+               ensureNativeEndian: cfg.ensureNativeEndian,
+               expectedSchema:     cfg.schema,
+       }
+       rr.refCount.Add(1)
+
+       if !cfg.noAutoSchema {
+               if err := rr.readSchema(cfg.schema); err != nil {
+                       return nil, err
+               }
+       }
+       return rr, nil
 }
 
 // Err returns the last error encountered during the iteration over the
diff --git a/arrow/ipc/writer.go b/arrow/ipc/writer.go
index 2fcf3dfb..9cd10e79 100644
--- a/arrow/ipc/writer.go
+++ b/arrow/ipc/writer.go
@@ -87,7 +87,7 @@ type Writer struct {
        codec           flatbuf.CompressionType
        compressNP      int
        compressors     []compressor
-       minSpaceSavings *float64
+       minSpaceSavings float64
 
        // map of the last written dictionaries by id
        // so we can avoid writing the same dictionary over and over
@@ -328,7 +328,7 @@ type recordEncoder struct {
        codec           flatbuf.CompressionType
        compressNP      int
        compressors     []compressor
-       minSpaceSavings *float64
+       minSpaceSavings float64
 }
 
 func newRecordEncoder(
@@ -338,7 +338,7 @@ func newRecordEncoder(
        allow64b bool,
        codec flatbuf.CompressionType,
        compressNP int,
-       minSpaceSavings *float64,
+       minSpaceSavings float64,
        compressors []compressor,
 ) *recordEncoder {
        return &recordEncoder{
@@ -355,12 +355,12 @@ func newRecordEncoder(
 
 func (w *recordEncoder) shouldCompress(uncompressed, compressed int) bool {
        debug.Assert(uncompressed > 0, "uncompressed size is 0")
-       if w.minSpaceSavings == nil {
+       if w.minSpaceSavings == 0 {
                return true
        }
 
        savings := 1.0 - float64(compressed)/float64(uncompressed)
-       return savings >= *w.minSpaceSavings
+       return savings >= w.minSpaceSavings
 }
 
 func (w *recordEncoder) reset() {
@@ -477,13 +477,10 @@ func (w *recordEncoder) encode(p *Payload, rec 
arrow.RecordBatch) error {
        }
 
        if w.codec != -1 {
-               if w.minSpaceSavings != nil {
-                       pct := *w.minSpaceSavings
-                       if pct < 0 || pct > 1 {
-                               p.Release()
-                               return fmt.Errorf("%w: minSpaceSavings not in 
range [0,1]. Provided %.05f",
-                                       arrow.ErrInvalid, pct)
-                       }
+               if w.minSpaceSavings < 0 || w.minSpaceSavings > 1 {
+                       p.Release()
+                       return fmt.Errorf("%w: minSpaceSavings not in range 
[0,1]. Provided %.05f",
+                               arrow.ErrInvalid, w.minSpaceSavings)
                }
                w.compressBodyBuffers(p)
        }
diff --git a/arrow/ipc/writer_test.go b/arrow/ipc/writer_test.go
index c37b904c..c91661a1 100644
--- a/arrow/ipc/writer_test.go
+++ b/arrow/ipc/writer_test.go
@@ -195,7 +195,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
 
        for _, codec := range 
[]flatbuf.CompressionType{flatbuf.CompressionTypeLZ4_FRAME, 
flatbuf.CompressionTypeZSTD} {
                compressors := []compressor{getCompressor(codec)}
-               enc := newRecordEncoder(mem, 0, 5, true, codec, 1, nil, 
compressors)
+               enc := newRecordEncoder(mem, 0, 5, true, codec, 1, 0, 
compressors)
                var payload Payload
                require.NoError(t, enc.encode(&payload, batch))
                assert.Len(t, payload.body, 2)
@@ -207,7 +207,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
                assert.Greater(t, compressedSize, int64(0))
                expectedSavings := 1.0 - 
float64(compressedSize)/float64(uncompressedSize)
 
-               compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1, 
&expectedSavings, compressors)
+               compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1, 
expectedSavings, compressors)
                payload.Release()
                payload.body = payload.body[:0]
                require.NoError(t, compressEncoder.encode(&payload, batch))
@@ -220,7 +220,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
                // slightly bump the threshold. the body buffer should now be 
prefixed
                // with -1 and its content left uncompressed
                minSavings := math.Nextafter(expectedSavings, 1.0)
-               compressEncoder.minSpaceSavings = &minSavings
+               compressEncoder.minSpaceSavings = minSavings
                require.NoError(t, compressEncoder.encode(&payload, batch))
                assert.Len(t, payload.body, 2)
                assert.EqualValues(t, -1, prefixedSize(payload.body[1]))
@@ -229,7 +229,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
                payload.body = payload.body[:0]
 
                for _, outOfRange := range []float64{math.Nextafter(1.0, 2.0), 
math.Nextafter(0, -1)} {
-                       compressEncoder.minSpaceSavings = &outOfRange
+                       compressEncoder.minSpaceSavings = outOfRange
                        err := compressEncoder.encode(&payload, batch)
                        assert.ErrorIs(t, err, arrow.ErrInvalid)
                        assert.ErrorContains(t, err, "minSpaceSavings not in 
range [0,1]")
diff --git a/arrow/memory/go_allocator.go b/arrow/memory/go_allocator.go
index 1017eb68..6ccf5f75 100644
--- a/arrow/memory/go_allocator.go
+++ b/arrow/memory/go_allocator.go
@@ -24,11 +24,8 @@ func (a *GoAllocator) Allocate(size int) []byte {
        buf := make([]byte, size+alignment) // padding for 64-byte alignment
        addr := int(addressOf(buf))
        next := roundUpToMultipleOf64(addr)
-       if addr != next {
-               shift := next - addr
-               return buf[shift : size+shift : size+shift]
-       }
-       return buf[:size:size]
+       shift := next - addr
+       return buf[shift : size+shift : size+shift]
 }
 
 func (a *GoAllocator) Reallocate(size int, b []byte) []byte {

Reply via email to