This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new 06793ba  Allow C Data Interface export of ReadOnlyMemory-backed 
buffers (#112)
06793ba is described below

commit 06793ba4e48cbf3b61e0c41c30b710c08ebcf368
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Mon Oct 6 19:32:28 2025 -0700

    Allow C Data Interface export of ReadOnlyMemory-backed buffers (#112)
    
    This PR partly addresses #111 by allowing C Data Interface export of
    buffers backed by `ReadOnlyMemory` rather than an `IMemoryOwner`. This
    is useful if you want to export data that has been read from an IPC file
    or stream for example. Previously you would need to first copy all
    buffers for this to work.
    
    Because this is an experimental feature and potentially risky, it must
    currently be enabled by setting the global static
    `CArrowArrayExporter.EnableManagedMemoryExport` to `true`.
    
    This change was originally submitted by @adamreeve as
    
[https://github.com/apache/arrow/pull/40992](https://github.com/apache/arrow/pull/40992).
    My only contribution is disabling the feature by default.
---
 src/Apache.Arrow/ArrowBuffer.cs                    |   8 +
 src/Apache.Arrow/C/CArrowArrayExporter.cs          |   5 +
 src/Apache.Arrow/Memory/ExportedAllocationOwner.cs |  18 +-
 src/Apache.Arrow/Memory/NativeMemoryManager.cs     |  15 +-
 .../CDataInterfacePythonTests.cs                   | 191 +++++++++++++++++++++
 5 files changed, 232 insertions(+), 5 deletions(-)

diff --git a/src/Apache.Arrow/ArrowBuffer.cs b/src/Apache.Arrow/ArrowBuffer.cs
index ef98bdc..28a06c3 100644
--- a/src/Apache.Arrow/ArrowBuffer.cs
+++ b/src/Apache.Arrow/ArrowBuffer.cs
@@ -16,6 +16,7 @@
 using System;
 using System.Buffers;
 using System.Runtime.CompilerServices;
+using Apache.Arrow.C;
 using Apache.Arrow.Memory;
 
 namespace Apache.Arrow
@@ -89,6 +90,13 @@ namespace Apache.Arrow
                 return true;
             }
 
+            if (_memoryOwner == null && 
CArrowArrayExporter.EnableManagedMemoryExport)
+            {
+                var handle = _memory.Pin();
+                ptr = newOwner.Reference(handle);
+                return true;
+            }
+
             ptr = IntPtr.Zero;
             return false;
         }
diff --git a/src/Apache.Arrow/C/CArrowArrayExporter.cs 
b/src/Apache.Arrow/C/CArrowArrayExporter.cs
index 4bbf9e1..9e9baf3 100644
--- a/src/Apache.Arrow/C/CArrowArrayExporter.cs
+++ b/src/Apache.Arrow/C/CArrowArrayExporter.cs
@@ -26,6 +26,11 @@ namespace Apache.Arrow.C
 {
     public static class CArrowArrayExporter
     {
+        /// <summary>
+        /// Experimental feature to enable exporting managed memory to 
CArrowArray. Use with caution.
+        /// </summary>
+        public static bool EnableManagedMemoryExport = false;
+
 #if NET5_0_OR_GREATER
         private static unsafe delegate* unmanaged<CArrowArray*, void> 
ReleaseArrayPtr => &ReleaseArray;
 #else
diff --git a/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs 
b/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs
index 516e7d7..cd52f6d 100644
--- a/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs
+++ b/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs
@@ -14,16 +14,17 @@
 // limitations under the License.
 
 using System;
+using System.Buffers;
 using System.Collections.Generic;
-using System.Diagnostics;
 using System.Runtime.InteropServices;
 using System.Threading;
 
 namespace Apache.Arrow.Memory
 {
-    internal sealed class ExportedAllocationOwner : INativeAllocationOwner, 
IDisposable
+    internal sealed class ExportedAllocationOwner : IDisposable
     {
         private readonly List<IntPtr> _pointers = new List<IntPtr>();
+        private readonly List<MemoryHandle> _handles = new 
List<MemoryHandle>();
         private long _allocationSize;
         private long _referenceCount;
         private bool _disposed;
@@ -46,9 +47,10 @@ namespace Apache.Arrow.Memory
             return ptr;
         }
 
-        public void Release(IntPtr ptr, int offset, int length)
+        public unsafe IntPtr Reference(MemoryHandle handle)
         {
-            throw new InvalidOperationException();
+            _handles.Add(handle);
+            return new IntPtr(handle.Pointer);
         }
 
         public void IncRef()
@@ -70,6 +72,7 @@ namespace Apache.Arrow.Memory
             {
                 return;
             }
+
             for (int i = 0; i < _pointers.Count; i++)
             {
                 if (_pointers[i] != IntPtr.Zero)
@@ -78,6 +81,13 @@ namespace Apache.Arrow.Memory
                     _pointers[i] = IntPtr.Zero;
                 }
             }
+
+            for (int i = 0; i < _handles.Count; i++)
+            {
+                _handles[i].Dispose();
+                _handles[i] = default;
+            }
+
             GC.RemoveMemoryPressure(_allocationSize);
             GC.SuppressFinalize(this);
             _disposed = true;
diff --git a/src/Apache.Arrow/Memory/NativeMemoryManager.cs 
b/src/Apache.Arrow/Memory/NativeMemoryManager.cs
index d42ee52..5fb51be 100644
--- a/src/Apache.Arrow/Memory/NativeMemoryManager.cs
+++ b/src/Apache.Arrow/Memory/NativeMemoryManager.cs
@@ -23,6 +23,7 @@ namespace Apache.Arrow.Memory
     public class NativeMemoryManager : MemoryManager<byte>, IOwnableAllocation
     {
         private IntPtr _ptr;
+        private int _pinCount;
         private readonly int _offset;
         private readonly int _length;
         private readonly INativeAllocationOwner _owner;
@@ -58,6 +59,7 @@ namespace Apache.Arrow.Memory
             // NOTE: Unmanaged memory doesn't require GC pinning because by 
definition it's not
             // managed by the garbage collector.
 
+            Interlocked.Increment(ref _pinCount);
             void* ptr = CalculatePointer(elementIndex);
             return new MemoryHandle(ptr, default, this);
         }
@@ -65,7 +67,7 @@ namespace Apache.Arrow.Memory
         public override void Unpin()
         {
             // SEE: Pin implementation
-            return;
+            Interlocked.Decrement(ref _pinCount);
         }
 
         protected override void Dispose(bool disposing)
@@ -74,6 +76,17 @@ namespace Apache.Arrow.Memory
             IntPtr ptr = Interlocked.Exchange(ref _ptr, IntPtr.Zero);
             if (ptr != IntPtr.Zero)
             {
+                if (disposing)
+                {
+                    // Only need to check for pinned data when disposing.
+                    // If disposed from the finalizer, there can be no 
MemoryHandles to this memory.
+                    if (_pinCount > 0)
+                    {
+                        _ptr = ptr;
+                        throw new InvalidOperationException("cannot free 
native memory while it is pinned");
+                    }
+                }
+
                 _owner.Release(ptr, _offset, _length);
             }
         }
diff --git a/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs 
b/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
index 44301f9..6dfb522 100644
--- a/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
+++ b/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
@@ -722,6 +722,64 @@ namespace Apache.Arrow.Tests
             CArrowSchema.Free(cSchema);
         }
 
+        [SkippableFact]
+        public unsafe void ExportManagedMemoryArray()
+        {
+            using (new EnableManagedExport())
+            {
+                var expectedValues = Enumerable.Range(0, 100).Select(i => i % 
10 == 3 ? null : (long?)i).ToArray();
+                var gcRefs = new List<WeakReference>();
+
+                void TestExport()
+                {
+                    var array = CreateManagedMemoryArray(expectedValues, 
gcRefs);
+
+                    dynamic pyArray;
+                    using (Py.GIL())
+                    {
+                        dynamic pa = Py.Import("pyarrow");
+                        pyArray = pa.array(expectedValues);
+                    }
+
+                    CArrowArray* cArray = CArrowArray.Create();
+                    CArrowArrayExporter.ExportArray(array, cArray);
+
+                    CArrowSchema* cSchema = CArrowSchema.Create();
+                    CArrowSchemaExporter.ExportType(array.Data.DataType, 
cSchema);
+
+                    GcCollect();
+                    foreach (var weakRef in gcRefs)
+                    {
+                        Assert.True(weakRef.IsAlive);
+                    }
+
+                    long arrayPtr = ((IntPtr)cArray).ToInt64();
+                    long schemaPtr = ((IntPtr)cSchema).ToInt64();
+
+                    using (Py.GIL())
+                    {
+                        dynamic pa = Py.Import("pyarrow");
+                        dynamic exportedPyArray = 
pa.Array._import_from_c(arrayPtr, schemaPtr);
+                        Assert.True(exportedPyArray == pyArray);
+
+                        // Required for the Python object to be garbage 
collected:
+                        exportedPyArray.Dispose();
+                    }
+
+                    CArrowArray.Free(cArray);
+                    CArrowSchema.Free(cSchema);
+                }
+
+                TestExport();
+
+                GcCollect();
+                foreach (var weakRef in gcRefs)
+                {
+                    Assert.False(weakRef.IsAlive);
+                }
+            }
+        }
+
         [SkippableFact]
         public unsafe void ExportBatch()
         {
@@ -930,6 +988,95 @@ namespace Apache.Arrow.Tests
             }
         }
 
+        [SkippableFact]
+        public async Task ExportBatchReadFromIpc()
+        {
+            var originalBatch = GetTestRecordBatch();
+            dynamic pyBatch = GetPythonRecordBatch();
+
+            using (new EnableManagedExport())
+            using (var stream = new MemoryStream())
+            {
+                var writer = new ArrowStreamWriter(stream, 
originalBatch.Schema);
+                await writer.WriteRecordBatchAsync(originalBatch);
+                await writer.WriteEndAsync();
+
+                stream.Seek(0, SeekOrigin.Begin);
+
+                var reader = new ArrowStreamReader(stream);
+                using var batch = await reader.ReadNextRecordBatchAsync();
+
+                Assert.NotNull(batch);
+                Assert.Equal(originalBatch.Length, batch.Length);
+
+                unsafe
+                {
+                    CArrowArray* cArray = CArrowArray.Create();
+                    CArrowArrayExporter.ExportRecordBatch(batch, cArray);
+
+                    CArrowSchema* cSchema = CArrowSchema.Create();
+                    CArrowSchemaExporter.ExportSchema(batch.Schema, cSchema);
+
+                    long arrayPtr = ((IntPtr)cArray).ToInt64();
+                    long schemaPtr = ((IntPtr)cSchema).ToInt64();
+
+                    using (Py.GIL())
+                    {
+                        dynamic pa = Py.Import("pyarrow");
+                        dynamic exportedPyArray = 
pa.RecordBatch._import_from_c(arrayPtr, schemaPtr);
+                        Assert.True(exportedPyArray == pyBatch);
+
+                        // Dispose to unpin memory
+                        exportedPyArray.Dispose();
+                    }
+
+                    CArrowArray.Free(cArray);
+                    CArrowSchema.Free(cSchema);
+                }
+            }
+        }
+
+        [SkippableFact]
+        public async Task EarlyDisposeOfExportedBatch()
+        {
+            // Reading IPC data from a Stream creates Arrow buffers backed by 
ReadOnlyMemory that point
+            // to slices of a single memory buffer owned by the RecordBatch 
(unless compression is used).
+            // Using the exported data after the RecordBatch has been disposed 
could cause
+            // memory corruption or access violations.
+
+            var originalBatch = GetTestRecordBatch();
+
+            using (new EnableManagedExport())
+            using (var stream = new MemoryStream())
+            {
+                var writer = new ArrowStreamWriter(stream, 
originalBatch.Schema);
+                await writer.WriteRecordBatchAsync(originalBatch);
+                await writer.WriteEndAsync();
+
+                stream.Seek(0, SeekOrigin.Begin);
+
+                var reader = new ArrowStreamReader(stream);
+                using var batch = await reader.ReadNextRecordBatchAsync();
+
+                Assert.NotNull(batch);
+                Assert.Equal(originalBatch.Length, batch.Length);
+
+                unsafe
+                {
+                    CArrowArray* cArray = CArrowArray.Create();
+                    CArrowArrayExporter.ExportRecordBatch(batch, cArray);
+
+                    CArrowSchema* cSchema = CArrowSchema.Create();
+                    CArrowSchemaExporter.ExportSchema(batch.Schema, cSchema);
+
+                    Assert.Throws<InvalidOperationException>(() => 
batch.Dispose());
+
+                    CArrowArray.Free(cArray);
+                    CArrowSchema.Free(cSchema);
+                }
+            }
+        }
+
         private static PyObject List(params int?[] values)
         {
             return new PyList(values.Select(i => i == null ? PyObject.None : 
new PyInt(i.Value)).ToArray());
@@ -960,6 +1107,34 @@ namespace Apache.Arrow.Tests
             return new PyTuple(values.Select(i => i == null ? PyObject.None : 
new PyInt(i.Value)).ToArray());
         }
 
+        private static IArrowArray CreateManagedMemoryArray(long?[] values, 
List<WeakReference> gcRefs)
+        {
+            var data = new byte[values.Length * sizeof(long)];
+            var validity = new byte[BitUtility.ByteCount(values.Length)];
+            var typedData = data.AsSpan().CastTo<long>();
+            var nullCount = 0;
+            for (var i = 0; i < values.Length; ++i)
+            {
+                BitUtility.SetBit(validity, i, values[i].HasValue);
+                typedData[i] = values[i].GetValueOrDefault(0);
+                nullCount += values[i].HasValue ? 0 : 1;
+            }
+
+            gcRefs.Add(new WeakReference(data));
+            gcRefs.Add(new WeakReference(validity));
+
+            return new Int64Array(new ArrowBuffer(data), new 
ArrowBuffer(validity), values.Length, nullCount, 0);
+        }
+
+        private static void GcCollect()
+        {
+            for (int i = 0; i < 3; ++i)
+            {
+                GC.Collect();
+                GC.WaitForPendingFinalizers();
+            }
+        }
+
         sealed class TestArrayStream : IArrowArrayStream
         {
             private readonly RecordBatch[] _batches;
@@ -986,5 +1161,21 @@ namespace Apache.Arrow.Tests
                 _index = -1;
             }
         }
+
+        sealed class EnableManagedExport : IDisposable
+        {
+            readonly bool _previousValue;
+
+            public EnableManagedExport()
+            {
+                _previousValue = CArrowArrayExporter.EnableManagedMemoryExport;
+                CArrowArrayExporter.EnableManagedMemoryExport = true;
+            }
+
+            public void Dispose()
+            {
+                CArrowArrayExporter.EnableManagedMemoryExport = _previousValue;
+            }
+        }
     }
 }

Reply via email to