This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 52dbce56 fix(table): fix refcount leak in 
enrichRecordsWithPosDeleteFields (#762)
52dbce56 is described below

commit 52dbce56cc9a0978fa1af686f13e616c358110aa
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Wed Mar 4 22:08:23 2026 +0100

    fix(table): fix refcount leak in enrichRecordsWithPosDeleteFields (#762)
    
    Arrays returned by NewArray() have refcount=1. NewRecordBatch calls
    Retain() on each column, bumping to refcount=2. Without an explicit
    Release() on the temporary arrays, the count never drops back to 1 when
    the record batch is released by the caller.
    
    Fix by assigning NewArray() results to local variables and deferring
    their Release(), so the lifecycle is: NewArray() -> refcount 1,
    NewRecordBatch Retain() -> refcount 2, deferred Release() -> refcount 1
    (owned by outData), caller releases outData -> refcount 0 -> freed.
    
    Also extend TestEnrichRecordsWithPosDeleteFields to use
    memory.NewCheckedAllocator with mem.AssertSize(t, 0) to catch this class
    of leak going forward.
    
    Fixes leak introduced in #721.
---
 table/arrow_scanner.go      | 29 ++++++++++++-----------------
 table/arrow_scanner_test.go | 10 ++++++----
 2 files changed, 18 insertions(+), 21 deletions(-)

diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
index 6621f6a4..30d56477 100644
--- a/table/arrow_scanner.go
+++ b/table/arrow_scanner.go
@@ -196,43 +196,38 @@ func processPositionalDeletes(ctx context.Context, 
deletes set[int64]) recProces
 // preserve the original position of those records.
 func enrichRecordsWithPosDeleteFields(ctx context.Context, filePath 
iceberg.DataFile) recProcessFn {
        nextIdx, mem := int64(0), compute.GetAllocator(ctx)
-       filePathField, ok := 
PositionalDeleteArrowSchema.FieldsByName("file_path")
-       if !ok {
-               panic("position delete schema should have required field 
'file_path'")
-       }
-       posField, ok := PositionalDeleteArrowSchema.FieldsByName("pos")
-       if !ok {
-               panic("position delete schema should have required field 'pos'")
-       }
 
        return func(inData arrow.RecordBatch) (outData arrow.RecordBatch, err 
error) {
                defer inData.Release()
 
                schema := inData.Schema()
                fieldIdx := schema.NumFields()
-               schema, err = schema.AddField(fieldIdx, filePathField[0])
+               schema, err = schema.AddField(fieldIdx, 
PositionalDeleteArrowSchema.Field(0))
                if err != nil {
                        return nil, err
                }
-               schema, err = schema.AddField(fieldIdx+1, posField[0])
+               schema, err = schema.AddField(fieldIdx+1, 
PositionalDeleteArrowSchema.Field(1))
                if err != nil {
                        return nil, err
                }
 
-               filePathBuilder := array.NewStringBuilder(mem)
-               defer filePathBuilder.Release()
-               posBuilder := array.NewInt64Builder(mem)
-               defer posBuilder.Release()
+               rb := array.NewRecordBuilder(mem, PositionalDeleteArrowSchema)
+               defer rb.Release()
+
+               filePathBldr, posBldr := rb.Field(0).(*array.StringBuilder), 
rb.Field(1).(*array.Int64Builder)
 
                startPos := nextIdx
                nextIdx += inData.NumRows()
 
                for i := startPos; i < nextIdx; i++ {
-                       filePathBuilder.Append(filePath.FilePath())
-                       posBuilder.Append(i)
+                       filePathBldr.Append(filePath.FilePath())
+                       posBldr.Append(i)
                }
 
-               columns := append(inData.Columns(), filePathBuilder.NewArray(), 
posBuilder.NewArray())
+               newCols := rb.NewRecordBatch()
+               defer newCols.Release()
+
+               columns := append(inData.Columns(), newCols.Column(0), 
newCols.Column(1))
                outData = array.NewRecordBatch(schema, columns, 
inData.NumRows())
 
                return outData, err
diff --git a/table/arrow_scanner_test.go b/table/arrow_scanner_test.go
index 58e34322..5f9bea49 100644
--- a/table/arrow_scanner_test.go
+++ b/table/arrow_scanner_test.go
@@ -24,6 +24,7 @@ import (
 
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/compute"
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
@@ -85,19 +86,19 @@ func TestEnrichRecordsWithPosDeleteFields(t *testing.T) {
 
        for _, tc := range testCases {
                t.Run(tc.name, func(t *testing.T) {
+                       mem := 
memory.NewCheckedAllocator(memory.DefaultAllocator)
+                       ctx := compute.WithAllocator(t.Context(), mem)
+                       defer mem.AssertSize(t, 0)
                        defer func() {
                                for _, b := range tc.inputBatches {
                                        b.Release()
                                }
                        }()
 
-                       enrichFn := 
enrichRecordsWithPosDeleteFields(t.Context(), &mockDataFile{path: 
"file://test_path.parquet"})
+                       enrichFn := enrichRecordsWithPosDeleteFields(ctx, 
&mockDataFile{path: "file://test_path.parquet"})
                        for i, b := range tc.inputBatches {
                                out, err := enrichFn(b)
                                require.NoError(t, err)
-                               defer func() {
-                                       out.Release()
-                               }()
 
                                assert.Equal(t, schemaWithPosDelete, 
out.Schema())
                                assert.Equal(t, out.NumRows(), b.NumRows())
@@ -109,6 +110,7 @@ func TestEnrichRecordsWithPosDeleteFields(t *testing.T) {
                                require.NoError(t, err)
 
                                assert.Equal(t, string(expectedOutputJSON), 
string(outAsJSON))
+                               out.Release()
                        }
                })
        }

Reply via email to