This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 64e07d94 fix(arrow/cdata): fix leaks identified by leak-sanitizer 
(#603)
64e07d94 is described below

commit 64e07d94c8b11a35cec99a7b3325f83952d9fbea
Author: Matt Topol <[email protected]>
AuthorDate: Tue Dec 9 11:08:18 2025 -0500

    fix(arrow/cdata): fix leaks identified by leak-sanitizer (#603)
    
    ### Rationale for this change
    fixes #597
    
    ### What changes are included in this PR?
    A series of calls to release, runtime.GC, free and so on to address all
    the leaks that were identified by the Go 1.25 sanitizer
    
    ### Are these changes tested?
    go1.25 is added to the workflows to run the tests with ASAN which will
    catch these sanitizer issues in the future
    
    ### Are there any user-facing changes?
    only the fixes
---
 .github/workflows/test.yml            | 13 +++++++++++++
 arrow/cdata/cdata.go                  |  1 +
 arrow/cdata/cdata_exports.go          | 10 +++++-----
 arrow/cdata/cdata_fulltest.c          |  5 +++++
 arrow/cdata/cdata_test.go             | 22 +++++++++++++++++++++-
 arrow/cdata/cdata_test_framework.go   |  9 +++++++++
 arrow/cdata/exports.go                | 28 +++++++++++++++++-----------
 arrow/memory/mallocator/mallocator.go |  9 ++++++++-
 8 files changed, 79 insertions(+), 18 deletions(-)

diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 5f6529fa..f896fc2a 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -46,6 +46,14 @@ jobs:
             arch: arm64v8
             go: 1.24
             runs-on: ubuntu-24.04-arm
+          - arch-label: AMD64
+            arch: amd64
+            go: 1.25
+            runs-on: ubuntu-latest
+          - arch-label: ARM64
+            arch: arm64v8
+            go: 1.25
+            runs-on: ubuntu-24.04-arm
     env:
       ARCH: ${{ matrix.arch }}
       GO: ${{ matrix.go }}
@@ -81,6 +89,7 @@ jobs:
       matrix:
         go:
           - '1.24'
+          - '1.25'
     env:
       GO: ${{ matrix.go }}
     steps:
@@ -120,6 +129,7 @@ jobs:
       matrix:
         go:
           - '1.24'
+          - '1.25'
     env:
       GO: ${{ matrix.go }}
     steps:
@@ -159,6 +169,7 @@ jobs:
       matrix:
         go:
           - '1.24'
+          - '1.25'
     steps:
       - name: Checkout
         uses: actions/checkout@c2d88d3ecc89a9ef08eebf45d9637801dcee7eb5 # 
v5.0.1
@@ -185,6 +196,7 @@ jobs:
       matrix:
         go:
           - '1.24'
+          - '1.25'
     env:
       ARROW_GO_TESTCGO: "1"
     steps:
@@ -218,6 +230,7 @@ jobs:
       matrix:
         go:
           - '1.24'
+          - '1.25'
     steps:
       - name: Checkout
         uses: actions/checkout@c2d88d3ecc89a9ef08eebf45d9637801dcee7eb5 # 
v5.0.1
diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go
index f006e494..4085ed3d 100644
--- a/arrow/cdata/cdata.go
+++ b/arrow/cdata/cdata.go
@@ -937,6 +937,7 @@ func initReader(rdr *nativeCRecordBatchReader, stream 
*CArrowArrayStream) error
                return rdr.getError(int(errno))
        }
        defer C.ArrowSchemaRelease(&sc)
+
        s, err := ImportCArrowSchema((*CArrowSchema)(&sc))
        if err != nil {
                return err
diff --git a/arrow/cdata/cdata_exports.go b/arrow/cdata/cdata_exports.go
index 9020ddf1..1e8e9bf9 100644
--- a/arrow/cdata/cdata_exports.go
+++ b/arrow/cdata/cdata_exports.go
@@ -459,7 +459,7 @@ type cRecordReader struct {
        err *C.char
 }
 
-func (rr cRecordReader) getSchema(out *CArrowSchema) int {
+func (rr *cRecordReader) getSchema(out *CArrowSchema) int {
        schema := rr.rdr.Schema()
        if schema == nil {
                return rr.maybeError()
@@ -468,7 +468,7 @@ func (rr cRecordReader) getSchema(out *CArrowSchema) int {
        return 0
 }
 
-func (rr cRecordReader) next(out *CArrowArray) int {
+func (rr *cRecordReader) next(out *CArrowArray) int {
        if rr.rdr.Next() {
                ExportArrowRecordBatch(rr.rdr.RecordBatch(), out, nil)
                return 0
@@ -477,7 +477,7 @@ func (rr cRecordReader) next(out *CArrowArray) int {
        return rr.maybeError()
 }
 
-func (rr cRecordReader) maybeError() int {
+func (rr *cRecordReader) maybeError() int {
        err := rr.rdr.Err()
        if err != nil {
                return C.EIO
@@ -485,7 +485,7 @@ func (rr cRecordReader) maybeError() int {
        return 0
 }
 
-func (rr cRecordReader) getLastError() *C.char {
+func (rr *cRecordReader) getLastError() *C.char {
        err := rr.rdr.Err()
        if err != nil {
                if rr.err != nil {
@@ -496,7 +496,7 @@ func (rr cRecordReader) getLastError() *C.char {
        return rr.err
 }
 
-func (rr cRecordReader) release() {
+func (rr *cRecordReader) release() {
        if rr.err != nil {
                C.free(unsafe.Pointer(rr.err))
        }
diff --git a/arrow/cdata/cdata_fulltest.c b/arrow/cdata/cdata_fulltest.c
index b4d94132..d9e67e93 100644
--- a/arrow/cdata/cdata_fulltest.c
+++ b/arrow/cdata/cdata_fulltest.c
@@ -118,6 +118,8 @@ static void release_nested_internal(struct ArrowSchema* 
schema,
     if (is_dynamic) {
         free((void*)schema->format);
         free((void*)schema->name);
+    } else {
+        free(schema->children);
     }
     ArrowSchemaMarkReleased(schema);
 }
@@ -161,6 +163,7 @@ void test_primitive(struct ArrowSchema* schema, const char* 
fmt) {
 // Since test_lists et al. allocate an entirely array of ArrowSchema pointers,
 // need to expose a function to free it.
 void free_malloced_schemas(struct ArrowSchema** schemas) {
+    free(schemas[0]);
     free(schemas);
 }
 
@@ -325,6 +328,7 @@ static void release_stream(struct ArrowArrayStream* st) {
 static void release_the_array(struct ArrowArray* out) {
     for (int i = 0; i < out->n_children; ++i) {
         ArrowArrayRelease(out->children[i]);
+        free(out->children[i]);
     }
     free((void*)out->children);
     free(out->buffers);
@@ -454,6 +458,7 @@ int test_exported_stream(struct ArrowArrayStream* stream) {
       stream->release(stream);
       break;
     }
+    array.release(&array);
   }
   return 0;
 }
diff --git a/arrow/cdata/cdata_test.go b/arrow/cdata/cdata_test.go
index 169ed848..170a5151 100644
--- a/arrow/cdata/cdata_test.go
+++ b/arrow/cdata/cdata_test.go
@@ -50,6 +50,7 @@ func TestSchemaExport(t *testing.T) {
        sc := exportInt32TypeSchema()
        f, err := importSchema(&sc)
        assert.NoError(t, err)
+       defer ReleaseCArrowSchema(&sc)
 
        keys, _ := getMetadataKeys()
        vals, _ := getMetadataValues()
@@ -66,6 +67,7 @@ func TestSimpleArrayExport(t *testing.T) {
        assert.False(t, test1IsReleased())
 
        testarr := exportInt32Array()
+       defer freeCArray(testarr)
        arr, err := ImportCArrayWithType(testarr, arrow.PrimitiveTypes.Int32)
        assert.NoError(t, err)
 
@@ -79,7 +81,9 @@ func TestSimpleArrayExport(t *testing.T) {
 
 func TestSimpleArrayAndSchema(t *testing.T) {
        sc := exportInt32TypeSchema()
+       defer ReleaseCArrowSchema(&sc)
        testarr := exportInt32Array()
+       defer freeCArray(testarr)
 
        // grab address of the buffer we stuck into the ArrowArray object
        buflist := (*[2]unsafe.Pointer)(unsafe.Pointer(testarr.buffers))
@@ -89,6 +93,7 @@ func TestSimpleArrayAndSchema(t *testing.T) {
        assert.NoError(t, err)
        assert.Equal(t, arrow.PrimitiveTypes.Int32, fld.Type)
        assert.EqualValues(t, 10, arr.Len())
+       defer arr.Release()
 
        // verify that the address is the same of the first integer for the
        // slice that is being used by the arrow.Array and the original buffer
@@ -782,6 +787,7 @@ func TestRecordBatch(t *testing.T) {
 
 func TestRecordReaderStream(t *testing.T) {
        stream := arrayStreamTest()
+       defer releaseStreamTest(stream)
        defer ReleaseCArrowArrayStream(stream)
 
        rdr := ImportCArrayStream(stream, nil)
@@ -806,6 +812,7 @@ func TestRecordReaderStream(t *testing.T) {
                assert.Equal(t, "bar", rec.Column(1).(*array.String).Value(1))
                assert.Equal(t, "baz", rec.Column(1).(*array.String).Value(2))
        }
+       runtime.GC()
 }
 
 func TestExportRecordReaderStream(t *testing.T) {
@@ -813,6 +820,7 @@ func TestExportRecordReaderStream(t *testing.T) {
        rdr, _ := array.NewRecordReader(reclist[0].Schema(), reclist)
 
        out := createTestStreamObj()
+       defer releaseStreamTest(out)
        ExportRecordReader(rdr, out)
 
        assert.NotNil(t, out.get_schema)
@@ -822,7 +830,7 @@ func TestExportRecordReaderStream(t *testing.T) {
        assert.NotNil(t, out.private_data)
 
        h := *(*cgo.Handle)(out.private_data)
-       assert.Same(t, rdr, h.Value().(cRecordReader).rdr)
+       assert.Same(t, rdr, h.Value().(*cRecordReader).rdr)
 
        importedRdr := ImportCArrayStream(out, nil)
        i := 0
@@ -862,6 +870,7 @@ func TestExportRecordReaderStreamLifetime(t *testing.T) {
        defer rdr.Release()
 
        out := createTestStreamObj()
+       defer releaseStreamTest(out)
        ExportRecordReader(rdr, out)
 
        // C Stream is holding on to memory
@@ -878,6 +887,7 @@ func TestEmptyListExport(t *testing.T) {
 
        var out CArrowArray
        ExportArrowArray(arr, &out, nil)
+       defer ReleaseCArrowArray(&out)
 
        assert.Zero(t, out.length)
        assert.Zero(t, out.null_count)
@@ -898,6 +908,8 @@ func TestEmptyDictExport(t *testing.T) {
        var out CArrowArray
        var sc CArrowSchema
        ExportArrowArray(arr, &out, &sc)
+       defer ReleaseCArrowArray(&out)
+       defer ReleaseCArrowSchema(&sc)
 
        assert.EqualValues(t, 'c', *sc.format)
        assert.NotZero(t, sc.flags&1)
@@ -933,6 +945,8 @@ func TestEmptyStringExport(t *testing.T) {
        var out CArrowArray
        var sc CArrowSchema
        ExportArrowArray(arr, &out, &sc)
+       defer ReleaseCArrowArray(&out)
+       defer ReleaseCArrowSchema(&sc)
 
        assert.EqualValues(t, 'u', *sc.format)
        assert.Zero(t, sc.n_children)
@@ -958,6 +972,8 @@ func TestEmptyUnionExport(t *testing.T) {
        var out CArrowArray
        var sc CArrowSchema
        ExportArrowArray(arr, &out, &sc)
+       defer ReleaseCArrowArray(&out)
+       defer ReleaseCArrowSchema(&sc)
 
        assert.EqualValues(t, 1, sc.n_children)
        assert.Nil(t, sc.dictionary)
@@ -1015,18 +1031,21 @@ func TestRecordReaderError(t *testing.T) {
                t.Fatalf("Expected error but got none")
        }
        assert.Contains(t, err.Error(), "Expected error message")
+       runtime.GC()
 
        err = roundTripStreamTest(&failingReader{opCount: 2})
        if err == nil {
                t.Fatalf("Expected error but got none")
        }
        assert.Contains(t, err.Error(), "Expected error message")
+       runtime.GC()
 
        err = roundTripStreamTest(&failingReader{opCount: 3})
        if err == nil {
                t.Fatalf("Expected error but got none")
        }
        assert.Contains(t, err.Error(), "Expected error message")
+       runtime.GC()
 }
 
 func TestRecordReaderImportError(t *testing.T) {
@@ -1061,6 +1080,7 @@ func TestConfuseGoGc(t *testing.T) {
                                assert.NoError(t, err)
                                runtime.GC()
                                assert.NoError(t, confuseGoGc(rdr))
+                               rdr.Release()
                                runtime.GC()
                        }
                        wg.Done()
diff --git a/arrow/cdata/cdata_test_framework.go 
b/arrow/cdata/cdata_test_framework.go
index fef18e5d..3e2c5bc8 100644
--- a/arrow/cdata/cdata_test_framework.go
+++ b/arrow/cdata/cdata_test_framework.go
@@ -123,6 +123,10 @@ func exportInt32Array() *CArrowArray {
        return arr
 }
 
+func freeCArray(arr *CArrowArray) {
+       C.free(unsafe.Pointer(arr))
+}
+
 func isReleased(arr *CArrowArray) bool {
        return C.ArrowArrayIsReleased(arr) == 1
 }
@@ -408,6 +412,10 @@ func arrayStreamTest() *CArrowArrayStream {
        return st
 }
 
+func releaseStreamTest(st *CArrowArrayStream) {
+       C.free(unsafe.Pointer(st))
+}
+
 func exportedStreamTest(reader array.RecordReader) error {
        out := C.get_test_stream()
        ExportRecordReader(reader, out)
@@ -421,6 +429,7 @@ func exportedStreamTest(reader array.RecordReader) error {
 
 func roundTripStreamTest(reader array.RecordReader) error {
        out := C.get_test_stream()
+       defer C.free(unsafe.Pointer(out))
        ExportRecordReader(reader, out)
        rdr, err := ImportCRecordReader(out, nil)
 
diff --git a/arrow/cdata/exports.go b/arrow/cdata/exports.go
index c83b6f82..6367a5b3 100644
--- a/arrow/cdata/exports.go
+++ b/arrow/cdata/exports.go
@@ -84,15 +84,15 @@ func releaseExportedSchema(schema *CArrowSchema) {
        C.free(unsafe.Pointer(schema.format))
        C.free(unsafe.Pointer(schema.metadata))
 
-       if schema.n_children == 0 {
-               return
-       }
-
        if schema.dictionary != nil {
                C.ArrowSchemaRelease(schema.dictionary)
                C.free(unsafe.Pointer(schema.dictionary))
        }
 
+       if schema.n_children == 0 {
+               return
+       }
+
        children := unsafe.Slice(schema.children, schema.n_children)
        for _, c := range children {
                C.ArrowSchemaRelease(c)
@@ -153,28 +153,28 @@ func releaseExportedArray(arr *CArrowArray) {
 //export streamGetSchema
 func streamGetSchema(handle *CArrowArrayStream, out *CArrowSchema) C.int {
        h := getHandle(handle.private_data)
-       rdr := h.Value().(cRecordReader)
+       rdr := h.Value().(*cRecordReader)
        return C.int(rdr.getSchema(out))
 }
 
 //export streamGetNext
 func streamGetNext(handle *CArrowArrayStream, out *CArrowArray) C.int {
        h := getHandle(handle.private_data)
-       rdr := h.Value().(cRecordReader)
+       rdr := h.Value().(*cRecordReader)
        return C.int(rdr.next(out))
 }
 
 //export streamGetError
 func streamGetError(handle *CArrowArrayStream) *C.cchar_t {
        h := getHandle(handle.private_data)
-       rdr := h.Value().(cRecordReader)
+       rdr := h.Value().(*cRecordReader)
        return rdr.getLastError()
 }
 
 //export streamRelease
 func streamRelease(handle *CArrowArrayStream) {
        h := getHandle(handle.private_data)
-       h.Value().(cRecordReader).release()
+       h.Value().(*cRecordReader).release()
        h.Delete()
        C.free(unsafe.Pointer(handle.private_data))
        handle.release = nil
@@ -187,7 +187,7 @@ func exportStream(rdr array.RecordReader, out 
*CArrowArrayStream) {
        out.get_last_error = (*[0]byte)(C.streamGetError)
        out.release = (*[0]byte)(C.streamRelease)
        rdr.Retain()
-       h := cgo.NewHandle(cRecordReader{rdr: rdr, err: nil})
+       h := cgo.NewHandle(&cRecordReader{rdr: rdr, err: nil})
        out.private_data = createHandle(h)
 }
 
@@ -206,6 +206,9 @@ type taskState struct {
 //export asyncStreamOnSchema
 func asyncStreamOnSchema(self *CArrowAsyncDeviceStreamHandler, schema 
*CArrowSchema) C.int {
        h := getHandle(self.private_data)
+       defer C.free(self.private_data)
+       defer h.Delete()
+
        handler := h.Value().(cAsyncState)
        defer close(handler.ch)
 
@@ -235,7 +238,6 @@ func asyncStreamOnSchema(self 
*CArrowAsyncDeviceStreamHandler, schema *CArrowSch
                ctx:       handler.ctx,
                taskQueue: taskQueue,
        }))
-       defer h.Delete()
 
        C.goCallRequest(self.producer, C.int64_t(handler.queueSize))
        go asyncTaskQueue(handler.ctx, sc, recordStream, taskQueue, 
self.producer)
@@ -347,7 +349,10 @@ func exportAsyncProducer(schema *arrow.Schema, stream 
<-chan RecordMessage, hand
        }()
 
        producer := C.get_producer()
-       defer C.free(unsafe.Pointer(producer))
+       defer func() {
+               C.free(producer.private_data)
+               C.free(unsafe.Pointer(producer))
+       }()
 
        producer.device_type = C.ARROW_DEVICE_CPU
        producer.request = (*[0]byte)(C.asyncProducerRequest)
@@ -410,6 +415,7 @@ func exportAsyncProducer(schema *arrow.Schema, stream 
<-chan RecordMessage, hand
                                if status != C.int(0) {
                                        msg.Record.Release()
                                        getHandle(task.private_data).Delete()
+                                       C.free(task.private_data)
                                        return fmt.Errorf("on_next_task failed 
with status %d", status)
                                }
                        default:
diff --git a/arrow/memory/mallocator/mallocator.go 
b/arrow/memory/mallocator/mallocator.go
index 994cfdb2..45959ee8 100644
--- a/arrow/memory/mallocator/mallocator.go
+++ b/arrow/memory/mallocator/mallocator.go
@@ -88,7 +88,14 @@ func (alloc *Mallocator) Allocate(size int) []byte {
 
        buf := unsafe.Slice((*byte)(ptr), paddedSize)
        aligned := roundToPowerOf2(uintptr(ptr), uintptr(alloc.alignment))
-       alloc.realAllocations.Store(aligned, uintptr(ptr))
+       if size == 0 {
+               // if we return a zero-sized slice, we need to store the actual 
ptr
+               // as the key in the map, since the returned slice will have 
the actual
+               // pointer value instead of the aligned pointer.
+               alloc.realAllocations.Store(uintptr(ptr), uintptr(ptr))
+       } else {
+               alloc.realAllocations.Store(aligned, uintptr(ptr))
+       }
        atomic.AddUint64(&alloc.allocatedBytes, uint64(size))
 
        if uintptr(ptr) != aligned {

Reply via email to