This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new f6575665 perf: optimize compute.Take for fewer memory allocations
(#557)
f6575665 is described below
commit f6575665bbf95d92795c37a1a792682d9da69b08
Author: Alex <[email protected]>
AuthorDate: Fri Oct 31 08:48:05 2025 -0600
perf: optimize compute.Take for fewer memory allocations (#557)
### Rationale for this change
When writing data to partitioned Iceberg tables using `iceberg-go`'s new
partitioned write capability, I found that Arrow's `compute.Take`
operation was causing the writes to be significantly slower than
unpartitioned writes.
Causes:
- the `VarBinaryImpl` function here in `arrow-go` was pre-allocating the
data buffer with only `meanValueLen` (size for just one string/buffer),
not the total size needed for all strings. For, my use case (writing
100k rows at a time) this was causing 15x buffer reallocations as the
buffer incrementally scales up. For my iceberg table with 20+
string/binary cols of various size this is significant overhead.
- when `VarBinaryImpl` allocates additional space for new items, it
would only add the exact amount needed for the new value. This caused
O(n) reallocations.
### What changes are included in this PR?
1. Pre-allocate upfront with a better estimate of the necessary buffer
size to eliminate repeated reallocations.
I somewhat arbitrarily chose a cap of 16MB as a guess at what would be
effective at reducing the number of allocations but also trying to be
cognizant of the library being used in many bespoke scenarios and not
wanting to make massive memory spikes. For my use case, I never hit this
16MB threshold and it could be smaller.
I am curious for your input on whether there should be a cap at all or
what a reasonable cap would be.
2. Use exponential growth for additional allocations for O(log n) total
reallocations.
### Are these changes tested?
No unit tests.
However, with a dedicated reproducing script that mimics my use case
with different write sizes (on a macbook pro m3 max + 64GB ram) average
of 3 `table.Append()` calls:
| Configuration | 100k rows | 500k rows | 1M rows | 2.5M rows | 10M rows
|
|--------------|-----------|-----------|---------|-----------|-----------|
| **Before** | 4.1s | 53.3s | didn't try | didn't try | didn't try |
| **Change 1** | 336ms | 2.41s | 8.75s | cancelled after 5mins | didn't
try |
| **Change 2** | 227ms | 897ms | 1.72s | 4.10s | 17.1s |
### Are there any user-facing changes?
No; just more performant
---
arrow/compute/internal/kernels/vector_selection.go | 24 ++-
arrow/compute/vector_selection_test.go | 215 +++++++++++++++++++++
2 files changed, 237 insertions(+), 2 deletions(-)
diff --git a/arrow/compute/internal/kernels/vector_selection.go
b/arrow/compute/internal/kernels/vector_selection.go
index 9bbc8635..1d6c2187 100644
--- a/arrow/compute/internal/kernels/vector_selection.go
+++ b/arrow/compute/internal/kernels/vector_selection.go
@@ -1486,7 +1486,12 @@ func VarBinaryImpl[OffsetT int32 | int64](ctx
*exec.KernelCtx, batch *exec.ExecS
if values.Len > 0 {
dataLength := rawOffsets[values.Len] - rawOffsets[0]
meanValueLen := float64(dataLength) / float64(values.Len)
- dataBuilder.reserve(int(meanValueLen))
+ estimatedTotalSize := int(meanValueLen * float64(outputLength))
+
+ // Cap the pre-allocation at a reasonable size
+ const maxPreAlloc = 16777216 // 16 MB
+ estimatedTotalSize = min(estimatedTotalSize, maxPreAlloc)
+ dataBuilder.reserve(estimatedTotalSize)
}
offsetBuilder.reserve(int(outputLength) + 1)
@@ -1503,7 +1508,22 @@ func VarBinaryImpl[OffsetT int32 | int64](ctx
*exec.KernelCtx, batch *exec.ExecS
}
offset += valSize
if valSize > OffsetT(spaceAvail) {
- dataBuilder.reserve(int(valSize))
+ // Calculate how much total capacity we need
+ needed := dataBuilder.len() + int(valSize)
+ newCap := dataBuilder.cap()
+
+ // Double capacity until we have enough space
+ // This gives us O(log n) reallocations instead
of O(n)
+ if newCap == 0 {
+ newCap = int(valSize)
+ }
+ for newCap < needed {
+ newCap = newCap * 2
+ }
+
+ // Reserve the additional capacity
+ additional := newCap - dataBuilder.len()
+ dataBuilder.reserve(additional)
spaceAvail = dataBuilder.cap() -
dataBuilder.len()
}
dataBuilder.unsafeAppendSlice(rawData[valOffset :
valOffset+valSize])
diff --git a/arrow/compute/vector_selection_test.go
b/arrow/compute/vector_selection_test.go
index af3ba569..475d3821 100644
--- a/arrow/compute/vector_selection_test.go
+++ b/arrow/compute/vector_selection_test.go
@@ -1650,3 +1650,218 @@ func TestFilterKernels(t *testing.T) {
suite.Run(t, new(FilterKernelWithChunked))
suite.Run(t, new(FilterKernelWithTable))
}
+
+// Benchmark tests for Take operation with variable-length data
+// These benchmarks test the performance improvements from buffer
pre-allocation
+// in VarBinaryImpl for string/binary data reorganization (e.g., partitioning).
+
+func BenchmarkTakeString(b *testing.B) {
+ // Test various batch sizes and string lengths
+ benchmarks := []struct {
+ name string
+ numRows int64
+ avgStrLen int
+ stringType arrow.DataType
+ }{
+ {"SmallBatch_ShortStrings", 1000, 10, arrow.BinaryTypes.String},
+ {"MediumBatch_ShortStrings", 10000, 10,
arrow.BinaryTypes.String},
+ {"LargeBatch_ShortStrings", 50000, 10,
arrow.BinaryTypes.String},
+ {"XLargeBatch_ShortStrings", 100000, 10,
arrow.BinaryTypes.String},
+ {"SmallBatch_MediumStrings", 1000, 50,
arrow.BinaryTypes.String},
+ {"MediumBatch_MediumStrings", 10000, 50,
arrow.BinaryTypes.String},
+ {"LargeBatch_MediumStrings", 50000, 50,
arrow.BinaryTypes.String},
+ {"XLargeBatch_MediumStrings", 100000, 50,
arrow.BinaryTypes.String},
+ {"LargeBatch_ShortStrings_Large", 50000, 10,
arrow.BinaryTypes.LargeString},
+ {"XLargeBatch_MediumStrings_Large", 100000, 50,
arrow.BinaryTypes.LargeString},
+ }
+
+ for _, bm := range benchmarks {
+ b.Run(bm.name, func(b *testing.B) {
+ mem := memory.NewGoAllocator()
+ ctx := compute.WithAllocator(context.Background(), mem)
+
+ // Create source array with strings of specified
average length
+ bldr := array.NewBuilder(mem, bm.stringType)
+ defer bldr.Release()
+
+ // Generate test data
+ for i := int64(0); i < bm.numRows; i++ {
+ // Create varied string content
+ str := fmt.Sprintf("value_%d_%s", i,
strings.Repeat("x", bm.avgStrLen-10))
+ switch b := bldr.(type) {
+ case *array.StringBuilder:
+ b.Append(str)
+ case *array.LargeStringBuilder:
+ b.Append(str)
+ }
+ }
+ values := bldr.NewArray()
+ defer values.Release()
+
+ // Create indices that simulate
partitioning/reorganization
+ // Use a pattern that would be common in partitioned
writes:
+ // reverse order to maximize data movement
+ indicesBldr := array.NewInt64Builder(mem)
+ defer indicesBldr.Release()
+ for i := bm.numRows - 1; i >= 0; i-- {
+ indicesBldr.Append(i)
+ }
+ indices := indicesBldr.NewArray()
+ defer indices.Release()
+
+ // Reset timer after setup
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ // Run benchmark
+ for i := 0; i < b.N; i++ {
+ result, err := compute.TakeArray(ctx, values,
indices)
+ if err != nil {
+ b.Fatal(err)
+ }
+ result.Release()
+ }
+
+ // Report throughput
+
b.ReportMetric(float64(bm.numRows*int64(b.N))/b.Elapsed().Seconds(), "rows/sec")
+ })
+ }
+}
+
+func BenchmarkTakeStringPartitionPattern(b *testing.B) {
+ // Simulate real-world partitioning workload where data is reorganized
+ // into multiple partitions (e.g., by timestamp month + host)
+ mem := memory.NewGoAllocator()
+ ctx := compute.WithAllocator(context.Background(), mem)
+
+ const numRows = 50000
+ const numPartitions = 8
+ const avgStrLen = 20
+
+ // Create source data
+ bldr := array.NewLargeStringBuilder(mem)
+ defer bldr.Release()
+ for i := 0; i < numRows; i++ {
+ str := fmt.Sprintf("host_%d_path_%s", i%100,
strings.Repeat("x", avgStrLen-15))
+ bldr.Append(str)
+ }
+ values := bldr.NewArray()
+ defer values.Release()
+
+ // Create indices that simulate partitioning by interleaving
+ // (every Nth row goes to partition N)
+ indicesBldr := array.NewInt64Builder(mem)
+ defer indicesBldr.Release()
+ for partition := 0; partition < numPartitions; partition++ {
+ for i := partition; i < numRows; i += numPartitions {
+ indicesBldr.Append(int64(i))
+ }
+ }
+ indices := indicesBldr.NewArray()
+ defer indices.Release()
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ result, err := compute.TakeArray(ctx, values, indices)
+ if err != nil {
+ b.Fatal(err)
+ }
+ result.Release()
+ }
+
+ b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
+}
+
+func BenchmarkTakeMultiColumn(b *testing.B) {
+ // Benchmark Take on a record batch with multiple string columns
+ // to simulate real-world use cases (e.g., CloudFront logs with 20+
string columns)
+ mem := memory.NewGoAllocator()
+ ctx := compute.WithAllocator(context.Background(), mem)
+
+ const numRows = 50000
+ const numStringCols = 20
+ const avgStrLen = 15
+
+ // Build schema with multiple string columns
+ fields := make([]arrow.Field, numStringCols+3)
+ for i := 0; i < numStringCols; i++ {
+ fields[i] = arrow.Field{
+ Name: fmt.Sprintf("string_col_%d", i),
+ Type: arrow.BinaryTypes.LargeString,
+ }
+ }
+ fields[numStringCols] = arrow.Field{Name: "int_col", Type:
arrow.PrimitiveTypes.Int64}
+ fields[numStringCols+1] = arrow.Field{Name: "float_col", Type:
arrow.PrimitiveTypes.Float64}
+ fields[numStringCols+2] = arrow.Field{Name: "timestamp_col", Type:
arrow.FixedWidthTypes.Timestamp_us}
+ schema := arrow.NewSchema(fields, nil)
+
+ // Build arrays
+ arrays := make([]arrow.Array, len(fields))
+ defer func() {
+ for _, arr := range arrays {
+ if arr != nil {
+ arr.Release()
+ }
+ }
+ }()
+
+ // Create string columns
+ for col := 0; col < numStringCols; col++ {
+ bldr := array.NewLargeStringBuilder(mem)
+ for i := 0; i < numRows; i++ {
+ str := fmt.Sprintf("col%d_val%d_%s", col, i,
strings.Repeat("x", avgStrLen-15))
+ bldr.Append(str)
+ }
+ arrays[col] = bldr.NewArray()
+ bldr.Release()
+ }
+
+ // Create numeric columns
+ intBldr := array.NewInt64Builder(mem)
+ floatBldr := array.NewFloat64Builder(mem)
+ tsBldr := array.NewTimestampBuilder(mem,
arrow.FixedWidthTypes.Timestamp_us.(*arrow.TimestampType))
+ for i := 0; i < numRows; i++ {
+ intBldr.Append(int64(i))
+ floatBldr.Append(float64(i) * 1.5)
+ tsBldr.Append(arrow.Timestamp(1700000000000000 +
int64(i)*1000000))
+ }
+ arrays[numStringCols] = intBldr.NewArray()
+ arrays[numStringCols+1] = floatBldr.NewArray()
+ arrays[numStringCols+2] = tsBldr.NewArray()
+ intBldr.Release()
+ floatBldr.Release()
+ tsBldr.Release()
+
+ // Create record batch
+ batch := array.NewRecordBatch(schema, arrays, numRows)
+ defer batch.Release()
+
+ // Create indices for partitioning pattern
+ indicesBldr := array.NewInt64Builder(mem)
+ defer indicesBldr.Release()
+ // Reverse order to maximize data movement
+ for i := numRows - 1; i >= 0; i-- {
+ indicesBldr.Append(int64(i))
+ }
+ indices := indicesBldr.NewArray()
+ defer indices.Release()
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ batchDatum := compute.NewDatum(batch)
+ indicesDatum := compute.NewDatum(indices)
+ result, err := compute.Take(ctx, *compute.DefaultTakeOptions(),
batchDatum, indicesDatum)
+ if err != nil {
+ b.Fatal(err)
+ }
+ result.Release()
+ batchDatum.Release()
+ indicesDatum.Release()
+ }
+
+ b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
+}