This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 52dbce56 fix(table): fix refcount leak in
enrichRecordsWithPosDeleteFields (#762)
52dbce56 is described below
commit 52dbce56cc9a0978fa1af686f13e616c358110aa
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Wed Mar 4 22:08:23 2026 +0100
fix(table): fix refcount leak in enrichRecordsWithPosDeleteFields (#762)
Arrays returned by NewArray() have refcount=1. NewRecordBatch calls
Retain() on each column, bumping to refcount=2. Without an explicit
Release() on the temporary arrays, the count never drops back to 1 when
the record batch is released by the caller.
Fix by assigning NewArray() results to local variables and deferring
their Release(), so the lifecycle is: NewArray() -> refcount 1,
NewRecordBatch Retain() -> refcount 2, deferred Release() -> refcount 1
(owned by outData), caller releases outData -> refcount 0 -> freed.
Also extend TestEnrichRecordsWithPosDeleteFields to use
memory.NewCheckedAllocator with mem.AssertSize(t, 0) to catch this class
of leak going forward.
Fixes leak introduced in #721.
---
table/arrow_scanner.go | 29 ++++++++++++-----------------
table/arrow_scanner_test.go | 10 ++++++----
2 files changed, 18 insertions(+), 21 deletions(-)
diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
index 6621f6a4..30d56477 100644
--- a/table/arrow_scanner.go
+++ b/table/arrow_scanner.go
@@ -196,43 +196,38 @@ func processPositionalDeletes(ctx context.Context,
deletes set[int64]) recProces
// preserve the original position of those records.
func enrichRecordsWithPosDeleteFields(ctx context.Context, filePath
iceberg.DataFile) recProcessFn {
nextIdx, mem := int64(0), compute.GetAllocator(ctx)
- filePathField, ok :=
PositionalDeleteArrowSchema.FieldsByName("file_path")
- if !ok {
- panic("position delete schema should have required field
'file_path'")
- }
- posField, ok := PositionalDeleteArrowSchema.FieldsByName("pos")
- if !ok {
- panic("position delete schema should have required field 'pos'")
- }
return func(inData arrow.RecordBatch) (outData arrow.RecordBatch, err
error) {
defer inData.Release()
schema := inData.Schema()
fieldIdx := schema.NumFields()
- schema, err = schema.AddField(fieldIdx, filePathField[0])
+ schema, err = schema.AddField(fieldIdx,
PositionalDeleteArrowSchema.Field(0))
if err != nil {
return nil, err
}
- schema, err = schema.AddField(fieldIdx+1, posField[0])
+ schema, err = schema.AddField(fieldIdx+1,
PositionalDeleteArrowSchema.Field(1))
if err != nil {
return nil, err
}
- filePathBuilder := array.NewStringBuilder(mem)
- defer filePathBuilder.Release()
- posBuilder := array.NewInt64Builder(mem)
- defer posBuilder.Release()
+ rb := array.NewRecordBuilder(mem, PositionalDeleteArrowSchema)
+ defer rb.Release()
+
+ filePathBldr, posBldr := rb.Field(0).(*array.StringBuilder),
rb.Field(1).(*array.Int64Builder)
startPos := nextIdx
nextIdx += inData.NumRows()
for i := startPos; i < nextIdx; i++ {
- filePathBuilder.Append(filePath.FilePath())
- posBuilder.Append(i)
+ filePathBldr.Append(filePath.FilePath())
+ posBldr.Append(i)
}
- columns := append(inData.Columns(), filePathBuilder.NewArray(),
posBuilder.NewArray())
+ newCols := rb.NewRecordBatch()
+ defer newCols.Release()
+
+ columns := append(inData.Columns(), newCols.Column(0),
newCols.Column(1))
outData = array.NewRecordBatch(schema, columns,
inData.NumRows())
return outData, err
diff --git a/table/arrow_scanner_test.go b/table/arrow_scanner_test.go
index 58e34322..5f9bea49 100644
--- a/table/arrow_scanner_test.go
+++ b/table/arrow_scanner_test.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -85,19 +86,19 @@ func TestEnrichRecordsWithPosDeleteFields(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
+ mem :=
memory.NewCheckedAllocator(memory.DefaultAllocator)
+ ctx := compute.WithAllocator(t.Context(), mem)
+ defer mem.AssertSize(t, 0)
defer func() {
for _, b := range tc.inputBatches {
b.Release()
}
}()
- enrichFn :=
enrichRecordsWithPosDeleteFields(t.Context(), &mockDataFile{path:
"file://test_path.parquet"})
+ enrichFn := enrichRecordsWithPosDeleteFields(ctx,
&mockDataFile{path: "file://test_path.parquet"})
for i, b := range tc.inputBatches {
out, err := enrichFn(b)
require.NoError(t, err)
- defer func() {
- out.Release()
- }()
assert.Equal(t, schemaWithPosDelete,
out.Schema())
assert.Equal(t, out.NumRows(), b.NumRows())
@@ -109,6 +110,7 @@ func TestEnrichRecordsWithPosDeleteFields(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, string(expectedOutputJSON),
string(outAsJSON))
+ out.Release()
}
})
}