This is an automated email from the ASF dual-hosted git repository. curth pushed a commit to branch refcount in repository https://gitbox.apache.org/repos/asf/arrow-dotnet.git
commit 0c95a71ff70561bada0aeed98bb6bdfbc544e7db Author: Curt Hagenlocher <[email protected]> AuthorDate: Sun Mar 22 09:03:32 2026 -0700 Add reference counting to ArrayData to enable sharing of partial values. --- src/Apache.Arrow/Arrays/ArrayData.cs | 93 ++++ src/Apache.Arrow/ArrowBuffer.cs | 23 +- src/Apache.Arrow/C/CArrowArrayExporter.cs | 21 +- src/Apache.Arrow/Memory/ExportedAllocationOwner.cs | 15 + src/Apache.Arrow/Memory/IOwnableAllocation.cs | 24 - src/Apache.Arrow/Memory/NativeMemoryManager.cs | 23 +- .../ArrayDataReferenceCountingTests.cs | 510 +++++++++++++++++++++ .../CDataInterfacePythonTests.cs | 95 ++-- test/Apache.Arrow.Tests/TestMemoryAllocator.cs | 4 +- 9 files changed, 674 insertions(+), 134 deletions(-) diff --git a/src/Apache.Arrow/Arrays/ArrayData.cs b/src/Apache.Arrow/Arrays/ArrayData.cs index 25cda4d..ab4c13f 100644 --- a/src/Apache.Arrow/Arrays/ArrayData.cs +++ b/src/Apache.Arrow/Arrays/ArrayData.cs @@ -16,6 +16,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using Apache.Arrow.Memory; using Apache.Arrow.Types; @@ -25,6 +26,9 @@ namespace Apache.Arrow { private const int RecalculateNullCount = -1; + private int _referenceCount = 1; + private readonly ArrayData _parent; // non-null for slices; the parent owns the buffers + public readonly IArrowType DataType; public readonly int Length; @@ -97,8 +101,52 @@ namespace Apache.Arrow Dictionary = dictionary; } + private ArrayData( + ArrayData parent, + IArrowType dataType, + int length, int nullCount, int offset, + ArrowBuffer[] buffers, ArrayData[] children, ArrayData dictionary) + { + _parent = parent; + DataType = dataType ?? NullType.Default; + Length = length; + NullCount = nullCount; + Offset = offset; + Buffers = buffers; + Children = children; + Dictionary = dictionary; + } + + /// <summary> + /// Increment the reference count, allowing this ArrayData to be shared + /// across multiple owners. Each call to Acquire must be balanced by a + /// call to <see cref="Dispose"/>. + /// </summary> + /// <returns>This instance.</returns> + public ArrayData Acquire() + { + if (Interlocked.Increment(ref _referenceCount) <= 1) + { + Interlocked.Decrement(ref _referenceCount); + throw new ObjectDisposedException(nameof(ArrayData)); + } + return this; + } + public void Dispose() { + if (Interlocked.Decrement(ref _referenceCount) != 0) + { + return; + } + + if (_parent != null) + { + // This is a slice — the parent owns the buffers, children, and dictionary. + _parent.Dispose(); + return; + } + if (Buffers != null) { foreach (ArrowBuffer buffer in Buffers) @@ -118,6 +166,13 @@ namespace Apache.Arrow Dictionary?.Dispose(); } + /// <summary> + /// Slice this ArrayData without ownership tracking. The returned slice shares + /// the underlying buffers but does not keep them alive — the caller must ensure + /// the original ArrayData outlives the slice. + /// Consider using <see cref="SliceShared"/> instead, which uses reference counting + /// to keep the underlying buffers alive for the lifetime of the slice. + /// </summary> public ArrayData Slice(int offset, int length) { if (offset > Length) @@ -149,6 +204,44 @@ namespace Apache.Arrow return new ArrayData(DataType, length, nullCount, offset, Buffers, Children, Dictionary); } + /// <summary> + /// Slice this ArrayData with shared ownership. The returned slice keeps the + /// underlying buffers alive via reference counting. The caller must dispose the + /// returned ArrayData when done. + /// </summary> + public ArrayData SliceShared(int offset, int length) + { + if (offset > Length) + { + throw new ArgumentException($"Offset {offset} cannot be greater than Length {Length} for Array.SliceShared"); + } + + length = Math.Min(Length - offset, length); + offset += Offset; + + int nullCount; + if (NullCount == 0) + { + nullCount = 0; + } + else if (NullCount == Length) + { + nullCount = length; + } + else if (offset == Offset && length == Length) + { + nullCount = NullCount; + } + else + { + nullCount = RecalculateNullCount; + } + + var root = _parent ?? this; + root.Acquire(); + return new ArrayData(root, DataType, length, nullCount, offset, Buffers, Children, Dictionary); + } + public ArrayData Clone(MemoryAllocator allocator = default) { return new ArrayData( diff --git a/src/Apache.Arrow/ArrowBuffer.cs b/src/Apache.Arrow/ArrowBuffer.cs index 28a06c3..1259674 100644 --- a/src/Apache.Arrow/ArrowBuffer.cs +++ b/src/Apache.Arrow/ArrowBuffer.cs @@ -16,7 +16,6 @@ using System; using System.Buffers; using System.Runtime.CompilerServices; -using Apache.Arrow.C; using Apache.Arrow.Memory; namespace Apache.Arrow @@ -83,22 +82,12 @@ namespace Apache.Arrow return true; } - if (_memoryOwner is IOwnableAllocation ownable && ownable.TryAcquire(out ptr, out int offset, out int length)) - { - newOwner.Acquire(ptr, offset, length); - ptr += offset; - return true; - } - - if (_memoryOwner == null && CArrowArrayExporter.EnableManagedMemoryExport) - { - var handle = _memory.Pin(); - ptr = newOwner.Reference(handle); - return true; - } - - ptr = IntPtr.Zero; - return false; + // Pin the memory and let the ExportedAllocationOwner track the handle. + // The caller is responsible for keeping the underlying ArrayData alive + // (via AddReference) so the memory owner is not disposed while pinned. + var handle = Memory.Pin(); + ptr = newOwner.Reference(handle); + return true; } } } diff --git a/src/Apache.Arrow/C/CArrowArrayExporter.cs b/src/Apache.Arrow/C/CArrowArrayExporter.cs index 0e140b5..9195df3 100644 --- a/src/Apache.Arrow/C/CArrowArrayExporter.cs +++ b/src/Apache.Arrow/C/CArrowArrayExporter.cs @@ -27,9 +27,10 @@ namespace Apache.Arrow.C public static class CArrowArrayExporter { /// <summary> - /// Experimental feature to enable exporting managed memory to CArrowArray. Use with caution. + /// Formerly-experimental feature to enable exporting managed memory to CArrowArray. Now obsolete. /// </summary> - public static bool EnableManagedMemoryExport = false; + [Obsolete] + public static bool EnableManagedMemoryExport; #if NET5_0_OR_GREATER private static unsafe delegate* unmanaged<CArrowArray*, void> ReleaseArrayPtr => &ReleaseArray; @@ -39,9 +40,8 @@ namespace Apache.Arrow.C private static IntPtr ReleaseArrayPtr => s_releaseArray.Pointer; #endif /// <summary> - /// Export an <see cref="IArrowArray"/> to a <see cref="CArrowArray"/>. Whether or not the - /// export succeeds, the original array becomes invalid. Clone an array to continue using it - /// after a copy has been exported. + /// Export an <see cref="IArrowArray"/> to a <see cref="CArrowArray"/>. The original array + /// remains valid after export because the exported data is kept alive via reference counting. /// </summary> /// <param name="array">The array to export</param> /// <param name="cArray">An allocated but uninitialized CArrowArray pointer.</param> @@ -76,9 +76,8 @@ namespace Apache.Arrow.C } /// <summary> - /// Export a <see cref="RecordBatch"/> to a <see cref="CArrowArray"/>. Whether or not the - /// export succeeds, the original record batch becomes invalid. Clone the batch to continue using it - /// after a copy has been exported. + /// Export a <see cref="RecordBatch"/> to a <see cref="CArrowArray"/>. The original record batch + /// remains valid after export because the exported data is kept alive via reference counting. /// </summary> /// <param name="batch">The record batch to export</param> /// <param name="cArray">An allocated but uninitialized CArrowArray pointer.</param> @@ -118,6 +117,8 @@ namespace Apache.Arrow.C private unsafe static void ConvertArray(ExportedAllocationOwner sharedOwner, ArrayData array, CArrowArray* cArray) { + sharedOwner.AddReference(array); + cArray->length = array.Length; cArray->offset = array.Offset; cArray->null_count = array.NullCount; // The C Data interface allows the null count to be -1 @@ -194,10 +195,6 @@ namespace Apache.Arrow.C cArray->n_children = batch.ColumnCount; cArray->children = null; - // XXX sharing the same ExportedAllocationOwner for all columns - // and child arrays makes memory tracking inflexible. - // If the consumer keeps only a single record batch column, - // the entire record batch memory is nevertheless kept alive. if (cArray->n_children > 0) { cArray->children = (CArrowArray**)sharedOwner.Allocate(IntPtr.Size * batch.ColumnCount); diff --git a/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs b/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs index cd52f6d..83621c8 100644 --- a/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs +++ b/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs @@ -25,6 +25,7 @@ namespace Apache.Arrow.Memory { private readonly List<IntPtr> _pointers = new List<IntPtr>(); private readonly List<MemoryHandle> _handles = new List<MemoryHandle>(); + private readonly List<ArrayData> _arrayDataRefs = new List<ArrayData>(); private long _allocationSize; private long _referenceCount; private bool _disposed; @@ -53,6 +54,15 @@ namespace Apache.Arrow.Memory return new IntPtr(handle.Pointer); } + /// <summary> + /// Hold a reference to an ArrayData, keeping it alive until this owner is disposed. + /// </summary> + public void AddReference(ArrayData data) + { + data.Acquire(); + _arrayDataRefs.Add(data); + } + public void IncRef() { Interlocked.Increment(ref _referenceCount); @@ -88,6 +98,11 @@ namespace Apache.Arrow.Memory _handles[i] = default; } + for (int i = 0; i < _arrayDataRefs.Count; i++) + { + _arrayDataRefs[i].Dispose(); + } + GC.RemoveMemoryPressure(_allocationSize); GC.SuppressFinalize(this); _disposed = true; diff --git a/src/Apache.Arrow/Memory/IOwnableAllocation.cs b/src/Apache.Arrow/Memory/IOwnableAllocation.cs deleted file mode 100644 index a5e7565..0000000 --- a/src/Apache.Arrow/Memory/IOwnableAllocation.cs +++ /dev/null @@ -1,24 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; - -namespace Apache.Arrow.Memory -{ - internal interface IOwnableAllocation - { - bool TryAcquire(out IntPtr ptr, out int offset, out int length); - } -} diff --git a/src/Apache.Arrow/Memory/NativeMemoryManager.cs b/src/Apache.Arrow/Memory/NativeMemoryManager.cs index 5fb51be..8bcb705 100644 --- a/src/Apache.Arrow/Memory/NativeMemoryManager.cs +++ b/src/Apache.Arrow/Memory/NativeMemoryManager.cs @@ -20,7 +20,7 @@ using System.Threading; namespace Apache.Arrow.Memory { - public class NativeMemoryManager : MemoryManager<byte>, IOwnableAllocation + public class NativeMemoryManager : MemoryManager<byte> { private IntPtr _ptr; private int _pinCount; @@ -91,27 +91,6 @@ namespace Apache.Arrow.Memory } } - bool IOwnableAllocation.TryAcquire(out IntPtr ptr, out int offset, out int length) - { - // TODO: implement refcounted buffers? - - if (object.ReferenceEquals(_owner, NativeMemoryAllocator.ExclusiveOwner)) - { - ptr = Interlocked.Exchange(ref _ptr, IntPtr.Zero); - if (ptr != IntPtr.Zero) - { - offset = _offset; - length = _length; - return true; - } - } - - ptr = IntPtr.Zero; - offset = 0; - length = 0; - return false; - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] private unsafe void* CalculatePointer(int index) => (_ptr + _offset + index).ToPointer(); diff --git a/test/Apache.Arrow.Tests/ArrayDataReferenceCountingTests.cs b/test/Apache.Arrow.Tests/ArrayDataReferenceCountingTests.cs new file mode 100644 index 0000000..3477f3b --- /dev/null +++ b/test/Apache.Arrow.Tests/ArrayDataReferenceCountingTests.cs @@ -0,0 +1,510 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Linq; +using Apache.Arrow.C; +using Apache.Arrow.Memory; +using Apache.Arrow.Types; +using Xunit; + +namespace Apache.Arrow.Tests +{ + public class ArrayDataReferenceCountingTests + { + private static Int32Array BuildInt32Array(params int[] values) + { + var builder = new Int32Array.Builder(); + builder.AppendRange(values); + return builder.Build(); + } + + private static Int32Array BuildInt32Array(MemoryAllocator allocator, params int[] values) + { + var builder = new Int32Array.Builder(); + builder.AppendRange(values); + return builder.Build(allocator); + } + + [Fact] + public void AcquireAndDispose_SingleOwner() + { + var array = BuildInt32Array(1, 2, 3); + var data = array.Data; + + // After build, data is usable + Assert.Equal(3, data.Length); + + // Dispose releases the data + array.Dispose(); + } + + [Fact] + public void AcquireAndDispose_TwoOwners() + { + var array = BuildInt32Array(1, 2, 3); + var data = array.Data; + + // Acquire a second reference + var shared = data.Acquire(); + Assert.Same(data, shared); + + // First dispose does not free the buffers (ref count goes from 2 to 1) + array.Dispose(); + + // Data is still usable via the shared reference + Assert.Equal(3, shared.Length); + Assert.Equal(Int32Type.Default, shared.DataType); + + // Verify we can still read the buffer contents + var span = shared.Buffers[1].Span; + Assert.True(span.Length > 0); + + // Second dispose frees the buffers (ref count goes from 1 to 0) + shared.Dispose(); + } + + [Fact] + public void Acquire_ThrowsOnDisposed() + { + var array = BuildInt32Array(1, 2, 3); + var data = array.Data; + + array.Dispose(); + + Assert.Throws<ObjectDisposedException>(() => data.Acquire()); + } + + [Fact] + public void SliceShared_KeepsParentAlive() + { + var array = BuildInt32Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + var sliced = array.Data.SliceShared(2, 5); + + // Dispose the original — the slice keeps the parent alive + array.Dispose(); + + // Sliced data should still be usable + Assert.Equal(5, sliced.Length); + Assert.Equal(2, sliced.Offset); + var span = sliced.Buffers[1].Span; + Assert.True(span.Length > 0); + + // Disposing the slice releases the parent + sliced.Dispose(); + } + + [Fact] + public void SliceShared_OfSliceShared_PointsToRoot() + { + var array = BuildInt32Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + var slice1 = array.Data.SliceShared(2, 6); + var slice2 = slice1.SliceShared(1, 3); + + // Dispose original and first slice + array.Dispose(); + slice1.Dispose(); + + // Second slice keeps the root alive + Assert.Equal(3, slice2.Length); + Assert.Equal(3, slice2.Offset); // 2 + 1 + + var span = slice2.Buffers[1].Span; + Assert.True(span.Length > 0); + + slice2.Dispose(); + } + + [Fact] + public void SliceShared_DisposeSliceFirst_ThenOriginal() + { + var array = BuildInt32Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + var sliced = array.Data.SliceShared(3, 4); + + // Disposing the slice first is fine — it just decrements the parent ref + sliced.Dispose(); + + // Original is still valid + Assert.Equal(10, array.Length); + Assert.Equal(0, array.GetValue(0)); + + array.Dispose(); + } + + [Fact] + public void ShareColumnsBetweenRecordBatches() + { + // Build a record batch with two columns + var col1 = BuildInt32Array(1, 2, 3); + var col2 = BuildInt32Array(4, 5, 6); + + var schema = new Schema.Builder() + .Field(new Field("a", Int32Type.Default, false)) + .Field(new Field("b", Int32Type.Default, false)) + .Build(); + + var batch1 = new RecordBatch(schema, new IArrowArray[] { col1, col2 }, 3); + + // Share column "a" into a new batch with a different column "c" + var sharedA = batch1.Column(0).Data.Acquire(); + var col3 = BuildInt32Array(7, 8, 9); + + var schema2 = new Schema.Builder() + .Field(new Field("a", Int32Type.Default, false)) + .Field(new Field("c", Int32Type.Default, false)) + .Build(); + + var batch2 = new RecordBatch(schema2, new IArrowArray[] + { + ArrowArrayFactory.BuildArray(sharedA), + col3, + }, 3); + + // Dispose original batch — shared column should stay alive in batch2 + batch1.Dispose(); + + // Verify batch2's column "a" is still readable + var aArray = (Int32Array)batch2.Column(0); + Assert.Equal(3, aArray.Length); + Assert.Equal(1, aArray.GetValue(0)); + Assert.Equal(2, aArray.GetValue(1)); + Assert.Equal(3, aArray.GetValue(2)); + + batch2.Dispose(); + } + + [Fact] + public unsafe void ExportArray_OriginalRemainsValid() + { + var array = BuildInt32Array(10, 20, 30); + + CArrowArray* cArray = CArrowArray.Create(); + CArrowArrayExporter.ExportArray(array, cArray); + + // Original array should still be valid after export + Assert.Equal(3, array.Length); + Assert.Equal(10, array.GetValue(0)); + Assert.Equal(20, array.GetValue(1)); + Assert.Equal(30, array.GetValue(2)); + + // Import the exported copy and verify it + using (var imported = (Int32Array)CArrowArrayImporter.ImportArray(cArray, array.Data.DataType)) + { + Assert.Equal(3, imported.Length); + Assert.Equal(10, imported.GetValue(0)); + Assert.Equal(20, imported.GetValue(1)); + Assert.Equal(30, imported.GetValue(2)); + } + + // Original should still be usable after import is disposed + Assert.Equal(10, array.GetValue(0)); + + array.Dispose(); + CArrowArray.Free(cArray); + } + + [Fact] + public unsafe void ExportRecordBatch_OriginalRemainsValid() + { + var col1 = BuildInt32Array(1, 2, 3); + var col2 = BuildInt32Array(4, 5, 6); + + var schema = new Schema.Builder() + .Field(new Field("a", Int32Type.Default, false)) + .Field(new Field("b", Int32Type.Default, false)) + .Build(); + + var batch = new RecordBatch(schema, new IArrowArray[] { col1, col2 }, 3); + + CArrowArray* cArray = CArrowArray.Create(); + CArrowArrayExporter.ExportRecordBatch(batch, cArray); + + // Original batch should still be valid + Assert.Equal(3, batch.Length); + var a = (Int32Array)batch.Column(0); + Assert.Equal(1, a.GetValue(0)); + Assert.Equal(2, a.GetValue(1)); + Assert.Equal(3, a.GetValue(2)); + + // Import and verify the exported copy + using (var imported = CArrowArrayImporter.ImportRecordBatch(cArray, schema)) + { + Assert.Equal(3, imported.Length); + var importedA = (Int32Array)imported.Column(0); + Assert.Equal(1, importedA.GetValue(0)); + } + + // Original still usable + Assert.Equal(1, ((Int32Array)batch.Column(0)).GetValue(0)); + + batch.Dispose(); + CArrowArray.Free(cArray); + } + + [Fact] + public unsafe void ExportSlicedArray_OriginalRemainsValid() + { + var array = BuildInt32Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + IArrowArray sliced = array.Slice(2, 6); + + CArrowArray* cArray = CArrowArray.Create(); + CArrowArrayExporter.ExportArray(sliced, cArray); + + // Original array should still be valid + Assert.Equal(10, array.Length); + Assert.Equal(0, array.GetValue(0)); + + // Import the sliced export + using (var imported = (Int32Array)CArrowArrayImporter.ImportArray(cArray, array.Data.DataType)) + { + Assert.Equal(6, imported.Length); + Assert.Equal(2, imported.GetValue(0)); + } + + sliced.Dispose(); + array.Dispose(); + CArrowArray.Free(cArray); + } + + [Fact] + public unsafe void ExportArray_DisposeOriginalBeforeImportRelease() + { + // Verify that disposing the original C# array before the C consumer + // releases the export does not cause issues. + var array = BuildInt32Array(10, 20, 30); + + CArrowArray* cArray = CArrowArray.Create(); + CArrowArrayExporter.ExportArray(array, cArray); + + // Dispose the original first + array.Dispose(); + + // The export should still be valid — the ref count keeps the data alive + using (var imported = (Int32Array)CArrowArrayImporter.ImportArray(cArray, Int32Type.Default)) + { + Assert.Equal(3, imported.Length); + Assert.Equal(10, imported.GetValue(0)); + Assert.Equal(20, imported.GetValue(1)); + Assert.Equal(30, imported.GetValue(2)); + } + + CArrowArray.Free(cArray); + } + + // --------------------------------------------------------------- + // Tracking-allocator tests: verify no leaks and no double-frees. + // TestMemoryAllocator tracks outstanding allocations via Rented, + // and throws ObjectDisposedException on double-free. + // --------------------------------------------------------------- + + [Fact] + public void Tracked_SingleOwner_FreesAll() + { + var allocator = new TestMemoryAllocator(); + var array = BuildInt32Array(allocator, 1, 2, 3); + Assert.True(allocator.Rented > 0); + + array.Dispose(); + Assert.Equal(0, allocator.Rented); + } + + [Fact] + public void Tracked_Acquire_FreesOnlyWhenLastRefDisposed() + { + var allocator = new TestMemoryAllocator(); + var array = BuildInt32Array(allocator, 1, 2, 3); + var shared = array.Data.Acquire(); + + array.Dispose(); + Assert.True(allocator.Rented > 0); // Still held by shared ref + + shared.Dispose(); + Assert.Equal(0, allocator.Rented); + } + + [Fact] + public void Tracked_MultipleAcquires_FreesOnlyWhenAllDisposed() + { + var allocator = new TestMemoryAllocator(); + var array = BuildInt32Array(allocator, 1, 2, 3); + var ref1 = array.Data.Acquire(); + var ref2 = array.Data.Acquire(); + + array.Dispose(); + Assert.True(allocator.Rented > 0); + + ref1.Dispose(); + Assert.True(allocator.Rented > 0); + + ref2.Dispose(); + Assert.Equal(0, allocator.Rented); + } + + [Fact] + public void Tracked_SliceShared_FreesWhenSliceDisposed() + { + var allocator = new TestMemoryAllocator(); + var array = BuildInt32Array(allocator, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + var sliced = array.Data.SliceShared(2, 5); + + array.Dispose(); + Assert.True(allocator.Rented > 0); // Slice keeps parent alive + + sliced.Dispose(); + Assert.Equal(0, allocator.Rented); + } + + [Fact] + public void Tracked_SliceShared_DisposeSliceFirst() + { + var allocator = new TestMemoryAllocator(); + var array = BuildInt32Array(allocator, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + var sliced = array.Data.SliceShared(3, 4); + + sliced.Dispose(); + Assert.True(allocator.Rented > 0); // Original still holds buffers + + array.Dispose(); + Assert.Equal(0, allocator.Rented); + } + + [Fact] + public void Tracked_ChainedSliceShared_FreesWhenLastDisposed() + { + var allocator = new TestMemoryAllocator(); + var array = BuildInt32Array(allocator, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + var slice1 = array.Data.SliceShared(2, 6); + var slice2 = slice1.SliceShared(1, 3); + + array.Dispose(); + Assert.True(allocator.Rented > 0); + + slice1.Dispose(); + Assert.True(allocator.Rented > 0); + + slice2.Dispose(); + Assert.Equal(0, allocator.Rented); + } + + [Fact] + public void Tracked_ShareColumnsBetweenBatches_FreesAll() + { + var allocator = new TestMemoryAllocator(); + var col1 = BuildInt32Array(allocator, 1, 2, 3); + var col2 = BuildInt32Array(allocator, 4, 5, 6); + + var schema = new Schema.Builder() + .Field(new Field("a", Int32Type.Default, false)) + .Field(new Field("b", Int32Type.Default, false)) + .Build(); + + var batch1 = new RecordBatch(schema, new IArrowArray[] { col1, col2 }, 3); + + var sharedA = batch1.Column(0).Data.Acquire(); + var col3 = BuildInt32Array(allocator, 7, 8, 9); + + var schema2 = new Schema.Builder() + .Field(new Field("a", Int32Type.Default, false)) + .Field(new Field("c", Int32Type.Default, false)) + .Build(); + + var batch2 = new RecordBatch(schema2, new IArrowArray[] + { + ArrowArrayFactory.BuildArray(sharedA), + col3, + }, 3); + + batch1.Dispose(); + Assert.True(allocator.Rented > 0); // shared column + col3 still alive + + batch2.Dispose(); + Assert.Equal(0, allocator.Rented); + } + + [Fact] + public unsafe void Tracked_ExportArray_FreesAfterBothSidesDispose() + { + var allocator = new TestMemoryAllocator(); + var array = BuildInt32Array(allocator, 10, 20, 30); + + CArrowArray* cArray = CArrowArray.Create(); + CArrowArrayExporter.ExportArray(array, cArray); + + // Dispose C# side first + array.Dispose(); + Assert.True(allocator.Rented > 0); // Export keeps data alive + + // Release the C side via import + dispose + using (var imported = CArrowArrayImporter.ImportArray(cArray, Int32Type.Default)) + { + Assert.Equal(10, ((Int32Array)imported).GetValue(0)); + } + + Assert.Equal(0, allocator.Rented); + CArrowArray.Free(cArray); + } + + [Fact] + public unsafe void Tracked_ExportArray_ReleaseExportFirst() + { + var allocator = new TestMemoryAllocator(); + var array = BuildInt32Array(allocator, 10, 20, 30); + + CArrowArray* cArray = CArrowArray.Create(); + CArrowArrayExporter.ExportArray(array, cArray); + + // Release the C side first + using (var imported = CArrowArrayImporter.ImportArray(cArray, Int32Type.Default)) + { + Assert.Equal(10, ((Int32Array)imported).GetValue(0)); + } + Assert.True(allocator.Rented > 0); // C# side still holds data + + // Now dispose the original + array.Dispose(); + Assert.Equal(0, allocator.Rented); + CArrowArray.Free(cArray); + } + + [Fact] + public unsafe void Tracked_ExportRecordBatch_FreesAll() + { + var allocator = new TestMemoryAllocator(); + var col1 = BuildInt32Array(allocator, 1, 2, 3); + var col2 = BuildInt32Array(allocator, 4, 5, 6); + + var schema = new Schema.Builder() + .Field(new Field("a", Int32Type.Default, false)) + .Field(new Field("b", Int32Type.Default, false)) + .Build(); + + var batch = new RecordBatch(schema, new IArrowArray[] { col1, col2 }, 3); + + CArrowArray* cArray = CArrowArray.Create(); + CArrowArrayExporter.ExportRecordBatch(batch, cArray); + + batch.Dispose(); + Assert.True(allocator.Rented > 0); + + using (var imported = CArrowArrayImporter.ImportRecordBatch(cArray, schema)) + { + Assert.Equal(3, imported.Length); + } + + Assert.Equal(0, allocator.Rented); + CArrowArray.Free(cArray); + } + } +} diff --git a/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs b/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs index 6a10255..427ce44 100644 --- a/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs +++ b/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs @@ -740,58 +740,55 @@ namespace Apache.Arrow.Tests [SkippableFact] public unsafe void ExportManagedMemoryArray() { - using (new EnableManagedExport()) + var expectedValues = Enumerable.Range(0, 100).Select(i => i % 10 == 3 ? null : (long?)i).ToArray(); + var gcRefs = new List<WeakReference>(); + + void TestExport() { - var expectedValues = Enumerable.Range(0, 100).Select(i => i % 10 == 3 ? null : (long?)i).ToArray(); - var gcRefs = new List<WeakReference>(); + var array = CreateManagedMemoryArray(expectedValues, gcRefs); - void TestExport() + dynamic pyArray; + using (Py.GIL()) { - var array = CreateManagedMemoryArray(expectedValues, gcRefs); - - dynamic pyArray; - using (Py.GIL()) - { - dynamic pa = Py.Import("pyarrow"); - pyArray = pa.array(expectedValues); - } - - CArrowArray* cArray = CArrowArray.Create(); - CArrowArrayExporter.ExportArray(array, cArray); + dynamic pa = Py.Import("pyarrow"); + pyArray = pa.array(expectedValues); + } - CArrowSchema* cSchema = CArrowSchema.Create(); - CArrowSchemaExporter.ExportType(array.Data.DataType, cSchema); + CArrowArray* cArray = CArrowArray.Create(); + CArrowArrayExporter.ExportArray(array, cArray); - GcCollect(); - foreach (var weakRef in gcRefs) - { - Assert.True(weakRef.IsAlive); - } + CArrowSchema* cSchema = CArrowSchema.Create(); + CArrowSchemaExporter.ExportType(array.Data.DataType, cSchema); - long arrayPtr = ((IntPtr)cArray).ToInt64(); - long schemaPtr = ((IntPtr)cSchema).ToInt64(); + GcCollect(); + foreach (var weakRef in gcRefs) + { + Assert.True(weakRef.IsAlive); + } - using (Py.GIL()) - { - dynamic pa = Py.Import("pyarrow"); - dynamic exportedPyArray = pa.Array._import_from_c(arrayPtr, schemaPtr); - Assert.True(exportedPyArray == pyArray); + long arrayPtr = ((IntPtr)cArray).ToInt64(); + long schemaPtr = ((IntPtr)cSchema).ToInt64(); - // Required for the Python object to be garbage collected: - exportedPyArray.Dispose(); - } + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + dynamic exportedPyArray = pa.Array._import_from_c(arrayPtr, schemaPtr); + Assert.True(exportedPyArray == pyArray); - CArrowArray.Free(cArray); - CArrowSchema.Free(cSchema); + // Required for the Python object to be garbage collected: + exportedPyArray.Dispose(); } - TestExport(); + CArrowArray.Free(cArray); + CArrowSchema.Free(cSchema); + } - GcCollect(); - foreach (var weakRef in gcRefs) - { - Assert.False(weakRef.IsAlive); - } + TestExport(); + + GcCollect(); + foreach (var weakRef in gcRefs) + { + Assert.False(weakRef.IsAlive); } } @@ -1009,7 +1006,6 @@ namespace Apache.Arrow.Tests var originalBatch = GetTestRecordBatch(); dynamic pyBatch = GetPythonRecordBatch(); - using (new EnableManagedExport()) using (var stream = new MemoryStream()) { var writer = new ArrowStreamWriter(stream, originalBatch.Schema); @@ -1061,7 +1057,6 @@ namespace Apache.Arrow.Tests var originalBatch = GetTestRecordBatch(); - using (new EnableManagedExport()) using (var stream = new MemoryStream()) { var writer = new ArrowStreamWriter(stream, originalBatch.Schema); @@ -1391,21 +1386,5 @@ namespace Apache.Arrow.Tests _index = -1; } } - - sealed class EnableManagedExport : IDisposable - { - readonly bool _previousValue; - - public EnableManagedExport() - { - _previousValue = CArrowArrayExporter.EnableManagedMemoryExport; - CArrowArrayExporter.EnableManagedMemoryExport = true; - } - - public void Dispose() - { - CArrowArrayExporter.EnableManagedMemoryExport = _previousValue; - } - } } } diff --git a/test/Apache.Arrow.Tests/TestMemoryAllocator.cs b/test/Apache.Arrow.Tests/TestMemoryAllocator.cs index ab38f74..2f5513c 100644 --- a/test/Apache.Arrow.Tests/TestMemoryAllocator.cs +++ b/test/Apache.Arrow.Tests/TestMemoryAllocator.cs @@ -58,7 +58,9 @@ namespace Apache.Arrow.Tests public void Dispose() { if (_disposed) - return; + { + throw new ObjectDisposedException(nameof(TestMemoryOwner), "Double-free detected"); + } _disposed = true; Interlocked.Decrement(ref _allocator._rented); _inner?.Dispose();
