This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new f6575665 perf: optimize compute.Take for fewer memory allocations 
(#557)
f6575665 is described below

commit f6575665bbf95d92795c37a1a792682d9da69b08
Author: Alex <[email protected]>
AuthorDate: Fri Oct 31 08:48:05 2025 -0600

    perf: optimize compute.Take for fewer memory allocations (#557)
    
    ### Rationale for this change
    
    When writing data to partitioned Iceberg tables using `iceberg-go`'s new
    partitioned write capability, I found that Arrow's `compute.Take`
    operation was causing the writes to be significantly slower than
    unpartitioned writes.
    
    Causes:
    - the `VarBinaryImpl` function here in `arrow-go` was pre-allocating the
    data buffer with only `meanValueLen` (size for just one string/buffer),
    not the total size needed for all strings. For, my use case (writing
    100k rows at a time) this was causing 15x buffer reallocations as the
    buffer incrementally scales up. For my iceberg table with 20+
    string/binary cols of various size this is significant overhead.
    - when `VarBinaryImpl` allocates additional space for new items, it
    would only add the exact amount needed for the new value. This caused
    O(n) reallocations.
    
    ### What changes are included in this PR?
    
    1. Pre-allocate upfront with a better estimate of the necessary buffer
    size to eliminate repeated reallocations.
    
    I somewhat arbitrarily chose a cap of 16MB as a guess at what would be
    effective at reducing the number of allocations but also trying to be
    cognizant of the library being used in many bespoke scenarios and not
    wanting to make massive memory spikes. For my use case, I never hit this
    16MB threshold and it could be smaller.
    
    I am curious for your input on whether there should be a cap at all or
    what a reasonable cap would be.
    
    2. Use exponential growth for additional allocations for O(log n) total
    reallocations.
    
    ### Are these changes tested?
    
    No unit tests.
    
    However, with a dedicated reproducing script that mimics my use case
    with different write sizes (on a macbook pro m3 max + 64GB ram) average
    of 3 `table.Append()` calls:
    
    | Configuration | 100k rows | 500k rows | 1M rows | 2.5M rows | 10M rows
    |
    
    |--------------|-----------|-----------|---------|-----------|-----------|
    | **Before** | 4.1s | 53.3s | didn't try | didn't try | didn't try |
    | **Change 1** | 336ms | 2.41s | 8.75s | cancelled after 5mins | didn't
    try |
    | **Change 2** | 227ms | 897ms | 1.72s | 4.10s | 17.1s |
    
    ### Are there any user-facing changes?
    
    No; just more performant
---
 arrow/compute/internal/kernels/vector_selection.go |  24 ++-
 arrow/compute/vector_selection_test.go             | 215 +++++++++++++++++++++
 2 files changed, 237 insertions(+), 2 deletions(-)

diff --git a/arrow/compute/internal/kernels/vector_selection.go 
b/arrow/compute/internal/kernels/vector_selection.go
index 9bbc8635..1d6c2187 100644
--- a/arrow/compute/internal/kernels/vector_selection.go
+++ b/arrow/compute/internal/kernels/vector_selection.go
@@ -1486,7 +1486,12 @@ func VarBinaryImpl[OffsetT int32 | int64](ctx 
*exec.KernelCtx, batch *exec.ExecS
        if values.Len > 0 {
                dataLength := rawOffsets[values.Len] - rawOffsets[0]
                meanValueLen := float64(dataLength) / float64(values.Len)
-               dataBuilder.reserve(int(meanValueLen))
+               estimatedTotalSize := int(meanValueLen * float64(outputLength))
+
+               // Cap the pre-allocation at a reasonable size
+               const maxPreAlloc = 16777216 // 16 MB
+               estimatedTotalSize = min(estimatedTotalSize, maxPreAlloc)
+               dataBuilder.reserve(estimatedTotalSize)
        }
 
        offsetBuilder.reserve(int(outputLength) + 1)
@@ -1503,7 +1508,22 @@ func VarBinaryImpl[OffsetT int32 | int64](ctx 
*exec.KernelCtx, batch *exec.ExecS
                        }
                        offset += valSize
                        if valSize > OffsetT(spaceAvail) {
-                               dataBuilder.reserve(int(valSize))
+                               // Calculate how much total capacity we need
+                               needed := dataBuilder.len() + int(valSize)
+                               newCap := dataBuilder.cap()
+
+                               // Double capacity until we have enough space
+                               // This gives us O(log n) reallocations instead 
of O(n)
+                               if newCap == 0 {
+                                       newCap = int(valSize)
+                               }
+                               for newCap < needed {
+                                       newCap = newCap * 2
+                               }
+
+                               // Reserve the additional capacity
+                               additional := newCap - dataBuilder.len()
+                               dataBuilder.reserve(additional)
                                spaceAvail = dataBuilder.cap() - 
dataBuilder.len()
                        }
                        dataBuilder.unsafeAppendSlice(rawData[valOffset : 
valOffset+valSize])
diff --git a/arrow/compute/vector_selection_test.go 
b/arrow/compute/vector_selection_test.go
index af3ba569..475d3821 100644
--- a/arrow/compute/vector_selection_test.go
+++ b/arrow/compute/vector_selection_test.go
@@ -1650,3 +1650,218 @@ func TestFilterKernels(t *testing.T) {
        suite.Run(t, new(FilterKernelWithChunked))
        suite.Run(t, new(FilterKernelWithTable))
 }
+
+// Benchmark tests for Take operation with variable-length data
+// These benchmarks test the performance improvements from buffer 
pre-allocation
+// in VarBinaryImpl for string/binary data reorganization (e.g., partitioning).
+
+func BenchmarkTakeString(b *testing.B) {
+       // Test various batch sizes and string lengths
+       benchmarks := []struct {
+               name       string
+               numRows    int64
+               avgStrLen  int
+               stringType arrow.DataType
+       }{
+               {"SmallBatch_ShortStrings", 1000, 10, arrow.BinaryTypes.String},
+               {"MediumBatch_ShortStrings", 10000, 10, 
arrow.BinaryTypes.String},
+               {"LargeBatch_ShortStrings", 50000, 10, 
arrow.BinaryTypes.String},
+               {"XLargeBatch_ShortStrings", 100000, 10, 
arrow.BinaryTypes.String},
+               {"SmallBatch_MediumStrings", 1000, 50, 
arrow.BinaryTypes.String},
+               {"MediumBatch_MediumStrings", 10000, 50, 
arrow.BinaryTypes.String},
+               {"LargeBatch_MediumStrings", 50000, 50, 
arrow.BinaryTypes.String},
+               {"XLargeBatch_MediumStrings", 100000, 50, 
arrow.BinaryTypes.String},
+               {"LargeBatch_ShortStrings_Large", 50000, 10, 
arrow.BinaryTypes.LargeString},
+               {"XLargeBatch_MediumStrings_Large", 100000, 50, 
arrow.BinaryTypes.LargeString},
+       }
+
+       for _, bm := range benchmarks {
+               b.Run(bm.name, func(b *testing.B) {
+                       mem := memory.NewGoAllocator()
+                       ctx := compute.WithAllocator(context.Background(), mem)
+
+                       // Create source array with strings of specified 
average length
+                       bldr := array.NewBuilder(mem, bm.stringType)
+                       defer bldr.Release()
+
+                       // Generate test data
+                       for i := int64(0); i < bm.numRows; i++ {
+                               // Create varied string content
+                               str := fmt.Sprintf("value_%d_%s", i, 
strings.Repeat("x", bm.avgStrLen-10))
+                               switch b := bldr.(type) {
+                               case *array.StringBuilder:
+                                       b.Append(str)
+                               case *array.LargeStringBuilder:
+                                       b.Append(str)
+                               }
+                       }
+                       values := bldr.NewArray()
+                       defer values.Release()
+
+                       // Create indices that simulate 
partitioning/reorganization
+                       // Use a pattern that would be common in partitioned 
writes:
+                       // reverse order to maximize data movement
+                       indicesBldr := array.NewInt64Builder(mem)
+                       defer indicesBldr.Release()
+                       for i := bm.numRows - 1; i >= 0; i-- {
+                               indicesBldr.Append(i)
+                       }
+                       indices := indicesBldr.NewArray()
+                       defer indices.Release()
+
+                       // Reset timer after setup
+                       b.ResetTimer()
+                       b.ReportAllocs()
+
+                       // Run benchmark
+                       for i := 0; i < b.N; i++ {
+                               result, err := compute.TakeArray(ctx, values, 
indices)
+                               if err != nil {
+                                       b.Fatal(err)
+                               }
+                               result.Release()
+                       }
+
+                       // Report throughput
+                       
b.ReportMetric(float64(bm.numRows*int64(b.N))/b.Elapsed().Seconds(), "rows/sec")
+               })
+       }
+}
+
+func BenchmarkTakeStringPartitionPattern(b *testing.B) {
+       // Simulate real-world partitioning workload where data is reorganized
+       // into multiple partitions (e.g., by timestamp month + host)
+       mem := memory.NewGoAllocator()
+       ctx := compute.WithAllocator(context.Background(), mem)
+
+       const numRows = 50000
+       const numPartitions = 8
+       const avgStrLen = 20
+
+       // Create source data
+       bldr := array.NewLargeStringBuilder(mem)
+       defer bldr.Release()
+       for i := 0; i < numRows; i++ {
+               str := fmt.Sprintf("host_%d_path_%s", i%100, 
strings.Repeat("x", avgStrLen-15))
+               bldr.Append(str)
+       }
+       values := bldr.NewArray()
+       defer values.Release()
+
+       // Create indices that simulate partitioning by interleaving
+       // (every Nth row goes to partition N)
+       indicesBldr := array.NewInt64Builder(mem)
+       defer indicesBldr.Release()
+       for partition := 0; partition < numPartitions; partition++ {
+               for i := partition; i < numRows; i += numPartitions {
+                       indicesBldr.Append(int64(i))
+               }
+       }
+       indices := indicesBldr.NewArray()
+       defer indices.Release()
+
+       b.ResetTimer()
+       b.ReportAllocs()
+
+       for i := 0; i < b.N; i++ {
+               result, err := compute.TakeArray(ctx, values, indices)
+               if err != nil {
+                       b.Fatal(err)
+               }
+               result.Release()
+       }
+
+       b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
+}
+
+func BenchmarkTakeMultiColumn(b *testing.B) {
+       // Benchmark Take on a record batch with multiple string columns
+       // to simulate real-world use cases (e.g., CloudFront logs with 20+ 
string columns)
+       mem := memory.NewGoAllocator()
+       ctx := compute.WithAllocator(context.Background(), mem)
+
+       const numRows = 50000
+       const numStringCols = 20
+       const avgStrLen = 15
+
+       // Build schema with multiple string columns
+       fields := make([]arrow.Field, numStringCols+3)
+       for i := 0; i < numStringCols; i++ {
+               fields[i] = arrow.Field{
+                       Name: fmt.Sprintf("string_col_%d", i),
+                       Type: arrow.BinaryTypes.LargeString,
+               }
+       }
+       fields[numStringCols] = arrow.Field{Name: "int_col", Type: 
arrow.PrimitiveTypes.Int64}
+       fields[numStringCols+1] = arrow.Field{Name: "float_col", Type: 
arrow.PrimitiveTypes.Float64}
+       fields[numStringCols+2] = arrow.Field{Name: "timestamp_col", Type: 
arrow.FixedWidthTypes.Timestamp_us}
+       schema := arrow.NewSchema(fields, nil)
+
+       // Build arrays
+       arrays := make([]arrow.Array, len(fields))
+       defer func() {
+               for _, arr := range arrays {
+                       if arr != nil {
+                               arr.Release()
+                       }
+               }
+       }()
+
+       // Create string columns
+       for col := 0; col < numStringCols; col++ {
+               bldr := array.NewLargeStringBuilder(mem)
+               for i := 0; i < numRows; i++ {
+                       str := fmt.Sprintf("col%d_val%d_%s", col, i, 
strings.Repeat("x", avgStrLen-15))
+                       bldr.Append(str)
+               }
+               arrays[col] = bldr.NewArray()
+               bldr.Release()
+       }
+
+       // Create numeric columns
+       intBldr := array.NewInt64Builder(mem)
+       floatBldr := array.NewFloat64Builder(mem)
+       tsBldr := array.NewTimestampBuilder(mem, 
arrow.FixedWidthTypes.Timestamp_us.(*arrow.TimestampType))
+       for i := 0; i < numRows; i++ {
+               intBldr.Append(int64(i))
+               floatBldr.Append(float64(i) * 1.5)
+               tsBldr.Append(arrow.Timestamp(1700000000000000 + 
int64(i)*1000000))
+       }
+       arrays[numStringCols] = intBldr.NewArray()
+       arrays[numStringCols+1] = floatBldr.NewArray()
+       arrays[numStringCols+2] = tsBldr.NewArray()
+       intBldr.Release()
+       floatBldr.Release()
+       tsBldr.Release()
+
+       // Create record batch
+       batch := array.NewRecordBatch(schema, arrays, numRows)
+       defer batch.Release()
+
+       // Create indices for partitioning pattern
+       indicesBldr := array.NewInt64Builder(mem)
+       defer indicesBldr.Release()
+       // Reverse order to maximize data movement
+       for i := numRows - 1; i >= 0; i-- {
+               indicesBldr.Append(int64(i))
+       }
+       indices := indicesBldr.NewArray()
+       defer indices.Release()
+
+       b.ResetTimer()
+       b.ReportAllocs()
+
+       for i := 0; i < b.N; i++ {
+               batchDatum := compute.NewDatum(batch)
+               indicesDatum := compute.NewDatum(indices)
+               result, err := compute.Take(ctx, *compute.DefaultTakeOptions(), 
batchDatum, indicesDatum)
+               if err != nil {
+                       b.Fatal(err)
+               }
+               result.Release()
+               batchDatum.Release()
+               indicesDatum.Release()
+       }
+
+       b.ReportMetric(float64(numRows*b.N)/b.Elapsed().Seconds(), "rows/sec")
+}

Reply via email to