This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new a9b0be43 Batch of small optimizations (#556)
a9b0be43 is described below
commit a9b0be4305d4e16df2d315ac587dbe9f61d425b7
Author: pixelherodev <[email protected]>
AuthorDate: Fri Oct 31 09:29:35 2025 -0500
Batch of small optimizations (#556)
This primarily consists of reducing allocations.
e.g. 03be6d3f adds a specific case to TypeEqual for ListView types,
which avoids reflect.DeepEqual. l.elem.Equal does not allocate;
reflect.DeepEqual does, so this reduces allocations when comparing a
ListViewType.
8c7729fc changes from referencing flatbug internal types by reference to
by-value, which avoids letting them escape to the heap.
e3406582 switches from reflect.DeepEqual to slices.Equal when comparing
the child IDs of a Union type, which is functionally equivalent for
slices and, again, does not allocate.
9eba6d2c does similar for an arrowflight cookie test, replacing
reflect.DeepEquals with maps.Equal; I did a grep for reflect.DeepEqual
and noticed it.
7557d15c is probably the most objectionable; it replaces a *float64 with
float64 directly for minSpaceSavings, since the current nil value has
identical behavior to zero anyways. It's a tiny cleanup IMO, but doesn't
really have any practical value.
2d3593f6 is a bit ugly; it inlines a copy of NewReaderFromMessageReader
and a copy of NewMessageReader into NewReader - by implementing the two
directly, it's able to avoid allocating two copies of the `config`
structure, and to avoid looping over and invoking config functions
twice. It's kinda ugly though, and the `// types: make(dictTypeMap)`
which is copied over suggests that there's further room for improvement.
I didn't drop the two APIs, as they are exposed to users, but IMO the
performance win is probably worth it, and it can be cleaned up later if
needed.
0273ffe8 is a small optimization to the allocator: the Go allocator
banches on each allocation, but both branches are functionally
equivalent, so I just merged the two. I doubt this would have a _major_
impact, since the cost of a branch is negligible compared to the cost of
a heap allocation, but performance is often won with a lot of small
impacts IMO :)
Lastly, for now at least, f1a6a331 statically allocates 4 bytes per IPC
messageReader (I did _not_ touch the ArrowFlight implementation), and
uses it on every message read, rather than heap-allocating 4 bytes each
time Message is called.
There's more to be done, but this should be a much more practically
sized chunk to review, and I can add more later :)
---------
Co-authored-by: Noam Preil <[email protected]>
---
arrow/compare.go | 5 ++++-
arrow/flight/basic_auth_flight_test.go | 10 ++++++++--
arrow/flight/cookie_middleware_test.go | 4 ++--
arrow/ipc/file_reader.go | 8 ++++----
arrow/ipc/file_writer.go | 2 +-
arrow/ipc/ipc.go | 4 ++--
arrow/ipc/message.go | 5 +++--
arrow/ipc/reader.go | 28 ++++++++++++++++++++++++++--
arrow/ipc/writer.go | 21 +++++++++------------
arrow/ipc/writer_test.go | 8 ++++----
arrow/memory/go_allocator.go | 7 ++-----
11 files changed, 65 insertions(+), 37 deletions(-)
diff --git a/arrow/compare.go b/arrow/compare.go
index 8966f16c..644de8e1 100644
--- a/arrow/compare.go
+++ b/arrow/compare.go
@@ -18,6 +18,7 @@ package arrow
import (
"reflect"
+ "slices"
)
type typeEqualsConfig struct {
@@ -122,7 +123,7 @@ func TypeEqual(left, right DataType, opts
...TypeEqualOption) bool {
return false
}
- if !reflect.DeepEqual(l.ChildIDs(), r.ChildIDs()) {
+ if !slices.Equal(l.ChildIDs(), r.ChildIDs()) {
return false
}
@@ -149,6 +150,8 @@ func TypeEqual(left, right DataType, opts
...TypeEqualOption) bool {
r := right.(*RunEndEncodedType)
return TypeEqual(l.Encoded(), r.Encoded(), opts...) &&
TypeEqual(l.runEnds, r.runEnds, opts...)
+ case *ListViewType:
+ return l.elem.Equal(right.(*ListViewType).elem)
default:
return reflect.DeepEqual(left, right)
}
diff --git a/arrow/flight/basic_auth_flight_test.go
b/arrow/flight/basic_auth_flight_test.go
index 6b59f998..849b25d8 100644
--- a/arrow/flight/basic_auth_flight_test.go
+++ b/arrow/flight/basic_auth_flight_test.go
@@ -73,11 +73,17 @@ func (*validator) IsValid(bearerToken string) (interface{},
error) {
func TestErrorAuths(t *testing.T) {
unary, stream :=
flight.CreateServerBearerTokenAuthInterceptors(&validator{})
s := flight.NewFlightServer(grpc.UnaryInterceptor(unary),
grpc.StreamInterceptor(stream))
- s.Init("localhost:0")
+ if err := s.Init("localhost:0"); err != nil {
+ panic(err)
+ }
f := &HeaderAuthTestFlight{}
s.RegisterFlightService(f)
- go s.Serve()
+ go func() {
+ if err := s.Serve(); err != nil {
+ panic(err)
+ }
+ }()
defer s.Shutdown()
client, err := flight.NewFlightClient(s.Addr().String(), nil,
grpc.WithTransportCredentials(insecure.NewCredentials()))
diff --git a/arrow/flight/cookie_middleware_test.go
b/arrow/flight/cookie_middleware_test.go
index 86605e66..7f0ce32a 100644
--- a/arrow/flight/cookie_middleware_test.go
+++ b/arrow/flight/cookie_middleware_test.go
@@ -21,9 +21,9 @@ import (
"errors"
"fmt"
"io"
+ "maps"
"net/http"
"net/textproto"
- "reflect"
"strings"
"testing"
"time"
@@ -85,7 +85,7 @@ func (s *serverAddCookieMiddleware) StartCall(ctx
context.Context) context.Conte
}
}
- if !reflect.DeepEqual(s.expectedCookies, got) {
+ if !maps.Equal(s.expectedCookies, got) {
panic(fmt.Sprintf("did not get expected cookies, expected %+v,
got %+v", s.expectedCookies, got))
}
diff --git a/arrow/ipc/file_reader.go b/arrow/ipc/file_reader.go
index 3c4b1b7e..d742d852 100644
--- a/arrow/ipc/file_reader.go
+++ b/arrow/ipc/file_reader.go
@@ -531,12 +531,12 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
return raw
}
-func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode {
+func (src *ipcSource) fieldMetadata(i int) flatbuf.FieldNode {
var node flatbuf.FieldNode
if !src.meta.Nodes(&node, i) {
panic("arrow/ipc: field metadata out of bound")
}
- return &node
+ return node
}
func (src *ipcSource) variadicCount(i int) int64 {
@@ -553,7 +553,7 @@ type arrayLoaderContext struct {
version MetadataVersion
}
-func (ctx *arrayLoaderContext) field() *flatbuf.FieldNode {
+func (ctx *arrayLoaderContext) field() flatbuf.FieldNode {
field := ctx.src.fieldMetadata(ctx.ifield)
ctx.ifield++
return field
@@ -647,7 +647,7 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType)
arrow.ArrayData {
}
}
-func (ctx *arrayLoaderContext) loadCommon(typ arrow.Type, nbufs int)
(*flatbuf.FieldNode, []*memory.Buffer) {
+func (ctx *arrayLoaderContext) loadCommon(typ arrow.Type, nbufs int)
(flatbuf.FieldNode, []*memory.Buffer) {
buffers := make([]*memory.Buffer, 0, nbufs)
field := ctx.field()
diff --git a/arrow/ipc/file_writer.go b/arrow/ipc/file_writer.go
index ff043ec9..5b1320ba 100644
--- a/arrow/ipc/file_writer.go
+++ b/arrow/ipc/file_writer.go
@@ -247,7 +247,7 @@ type FileWriter struct {
codec flatbuf.CompressionType
compressNP int
compressors []compressor
- minSpaceSavings *float64
+ minSpaceSavings float64
// map of the last written dictionaries by id
// so we can avoid writing the same dictionary over and over
diff --git a/arrow/ipc/ipc.go b/arrow/ipc/ipc.go
index c4589da6..ca2e1ecd 100644
--- a/arrow/ipc/ipc.go
+++ b/arrow/ipc/ipc.go
@@ -71,7 +71,7 @@ type config struct {
ensureNativeEndian bool
noAutoSchema bool
emitDictDeltas bool
- minSpaceSavings *float64
+ minSpaceSavings float64
}
func newConfig(opts ...Option) *config {
@@ -189,7 +189,7 @@ func WithDictionaryDeltas(v bool) Option {
// Go and C++ versions prior to 12.0.0.
func WithMinSpaceSavings(savings float64) Option {
return func(cfg *config) {
- cfg.minSpaceSavings = &savings
+ cfg.minSpaceSavings = savings
}
}
diff --git a/arrow/ipc/message.go b/arrow/ipc/message.go
index c96869ec..a2c4e370 100644
--- a/arrow/ipc/message.go
+++ b/arrow/ipc/message.go
@@ -149,7 +149,8 @@ type messageReader struct {
refCount atomic.Int64
msg *Message
- mem memory.Allocator
+ mem memory.Allocator
+ header [4]byte
}
// NewMessageReader returns a reader that reads messages from an input stream.
@@ -188,7 +189,7 @@ func (r *messageReader) Release() {
// underlying stream.
// It is valid until the next call to Message.
func (r *messageReader) Message() (*Message, error) {
- buf := make([]byte, 4)
+ buf := r.header[:]
_, err := io.ReadFull(r.r, buf)
if err != nil {
return nil, fmt.Errorf("arrow/ipc: could not read continuation
indicator: %w", err)
diff --git a/arrow/ipc/reader.go b/arrow/ipc/reader.go
index 9d7096c0..df9e5000 100644
--- a/arrow/ipc/reader.go
+++ b/arrow/ipc/reader.go
@@ -89,8 +89,32 @@ func NewReaderFromMessageReader(r MessageReader, opts
...Option) (reader *Reader
}
// NewReader returns a reader that reads records from an input stream.
-func NewReader(r io.Reader, opts ...Option) (*Reader, error) {
- return NewReaderFromMessageReader(NewMessageReader(r, opts...), opts...)
+func NewReader(r io.Reader, opts ...Option) (rr *Reader, err error) {
+ defer func() {
+ if pErr := recover(); pErr != nil {
+ err = utils.FormatRecoveredError("arrow/ipc: unknown
error while reading", pErr)
+ }
+ }()
+ cfg := newConfig(opts...)
+ mr := &messageReader{r: r, mem: cfg.alloc}
+ mr.refCount.Add(1)
+ rr = &Reader{
+ r: mr,
+ refCount: atomic.Int64{},
+ // types: make(dictTypeMap),
+ memo: dictutils.NewMemo(),
+ mem: cfg.alloc,
+ ensureNativeEndian: cfg.ensureNativeEndian,
+ expectedSchema: cfg.schema,
+ }
+ rr.refCount.Add(1)
+
+ if !cfg.noAutoSchema {
+ if err := rr.readSchema(cfg.schema); err != nil {
+ return nil, err
+ }
+ }
+ return rr, nil
}
// Err returns the last error encountered during the iteration over the
diff --git a/arrow/ipc/writer.go b/arrow/ipc/writer.go
index 2fcf3dfb..9cd10e79 100644
--- a/arrow/ipc/writer.go
+++ b/arrow/ipc/writer.go
@@ -87,7 +87,7 @@ type Writer struct {
codec flatbuf.CompressionType
compressNP int
compressors []compressor
- minSpaceSavings *float64
+ minSpaceSavings float64
// map of the last written dictionaries by id
// so we can avoid writing the same dictionary over and over
@@ -328,7 +328,7 @@ type recordEncoder struct {
codec flatbuf.CompressionType
compressNP int
compressors []compressor
- minSpaceSavings *float64
+ minSpaceSavings float64
}
func newRecordEncoder(
@@ -338,7 +338,7 @@ func newRecordEncoder(
allow64b bool,
codec flatbuf.CompressionType,
compressNP int,
- minSpaceSavings *float64,
+ minSpaceSavings float64,
compressors []compressor,
) *recordEncoder {
return &recordEncoder{
@@ -355,12 +355,12 @@ func newRecordEncoder(
func (w *recordEncoder) shouldCompress(uncompressed, compressed int) bool {
debug.Assert(uncompressed > 0, "uncompressed size is 0")
- if w.minSpaceSavings == nil {
+ if w.minSpaceSavings == 0 {
return true
}
savings := 1.0 - float64(compressed)/float64(uncompressed)
- return savings >= *w.minSpaceSavings
+ return savings >= w.minSpaceSavings
}
func (w *recordEncoder) reset() {
@@ -477,13 +477,10 @@ func (w *recordEncoder) encode(p *Payload, rec
arrow.RecordBatch) error {
}
if w.codec != -1 {
- if w.minSpaceSavings != nil {
- pct := *w.minSpaceSavings
- if pct < 0 || pct > 1 {
- p.Release()
- return fmt.Errorf("%w: minSpaceSavings not in
range [0,1]. Provided %.05f",
- arrow.ErrInvalid, pct)
- }
+ if w.minSpaceSavings < 0 || w.minSpaceSavings > 1 {
+ p.Release()
+ return fmt.Errorf("%w: minSpaceSavings not in range
[0,1]. Provided %.05f",
+ arrow.ErrInvalid, w.minSpaceSavings)
}
w.compressBodyBuffers(p)
}
diff --git a/arrow/ipc/writer_test.go b/arrow/ipc/writer_test.go
index c37b904c..c91661a1 100644
--- a/arrow/ipc/writer_test.go
+++ b/arrow/ipc/writer_test.go
@@ -195,7 +195,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
for _, codec := range
[]flatbuf.CompressionType{flatbuf.CompressionTypeLZ4_FRAME,
flatbuf.CompressionTypeZSTD} {
compressors := []compressor{getCompressor(codec)}
- enc := newRecordEncoder(mem, 0, 5, true, codec, 1, nil,
compressors)
+ enc := newRecordEncoder(mem, 0, 5, true, codec, 1, 0,
compressors)
var payload Payload
require.NoError(t, enc.encode(&payload, batch))
assert.Len(t, payload.body, 2)
@@ -207,7 +207,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
assert.Greater(t, compressedSize, int64(0))
expectedSavings := 1.0 -
float64(compressedSize)/float64(uncompressedSize)
- compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1,
&expectedSavings, compressors)
+ compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1,
expectedSavings, compressors)
payload.Release()
payload.body = payload.body[:0]
require.NoError(t, compressEncoder.encode(&payload, batch))
@@ -220,7 +220,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
// slightly bump the threshold. the body buffer should now be
prefixed
// with -1 and its content left uncompressed
minSavings := math.Nextafter(expectedSavings, 1.0)
- compressEncoder.minSpaceSavings = &minSavings
+ compressEncoder.minSpaceSavings = minSavings
require.NoError(t, compressEncoder.encode(&payload, batch))
assert.Len(t, payload.body, 2)
assert.EqualValues(t, -1, prefixedSize(payload.body[1]))
@@ -229,7 +229,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
payload.body = payload.body[:0]
for _, outOfRange := range []float64{math.Nextafter(1.0, 2.0),
math.Nextafter(0, -1)} {
- compressEncoder.minSpaceSavings = &outOfRange
+ compressEncoder.minSpaceSavings = outOfRange
err := compressEncoder.encode(&payload, batch)
assert.ErrorIs(t, err, arrow.ErrInvalid)
assert.ErrorContains(t, err, "minSpaceSavings not in
range [0,1]")
diff --git a/arrow/memory/go_allocator.go b/arrow/memory/go_allocator.go
index 1017eb68..6ccf5f75 100644
--- a/arrow/memory/go_allocator.go
+++ b/arrow/memory/go_allocator.go
@@ -24,11 +24,8 @@ func (a *GoAllocator) Allocate(size int) []byte {
buf := make([]byte, size+alignment) // padding for 64-byte alignment
addr := int(addressOf(buf))
next := roundUpToMultipleOf64(addr)
- if addr != next {
- shift := next - addr
- return buf[shift : size+shift : size+shift]
- }
- return buf[:size:size]
+ shift := next - addr
+ return buf[shift : size+shift : size+shift]
}
func (a *GoAllocator) Reallocate(size int, b []byte) []byte {