This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-dotnet.git
The following commit(s) were added to refs/heads/main by this push:
new 06793ba Allow C Data Interface export of ReadOnlyMemory-backed
buffers (#112)
06793ba is described below
commit 06793ba4e48cbf3b61e0c41c30b710c08ebcf368
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Mon Oct 6 19:32:28 2025 -0700
Allow C Data Interface export of ReadOnlyMemory-backed buffers (#112)
This PR partly addresses #111 by allowing C Data Interface export of
buffers backed by `ReadOnlyMemory` rather than an `IMemoryOwner`. This
is useful if you want to export data that has been read from an IPC file
or stream for example. Previously you would need to first copy all
buffers for this to work.
Because this is an experimental feature and potentially risky, it must
currently be enabled by setting the global static
`CArrowArrayExporter.EnableManagedMemoryExport` to `true`.
This change was originally submitted by @adamreeve as
[https://github.com/apache/arrow/pull/40992](https://github.com/apache/arrow/pull/40992).
My only contribution is disabling the feature by default.
---
src/Apache.Arrow/ArrowBuffer.cs | 8 +
src/Apache.Arrow/C/CArrowArrayExporter.cs | 5 +
src/Apache.Arrow/Memory/ExportedAllocationOwner.cs | 18 +-
src/Apache.Arrow/Memory/NativeMemoryManager.cs | 15 +-
.../CDataInterfacePythonTests.cs | 191 +++++++++++++++++++++
5 files changed, 232 insertions(+), 5 deletions(-)
diff --git a/src/Apache.Arrow/ArrowBuffer.cs b/src/Apache.Arrow/ArrowBuffer.cs
index ef98bdc..28a06c3 100644
--- a/src/Apache.Arrow/ArrowBuffer.cs
+++ b/src/Apache.Arrow/ArrowBuffer.cs
@@ -16,6 +16,7 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;
+using Apache.Arrow.C;
using Apache.Arrow.Memory;
namespace Apache.Arrow
@@ -89,6 +90,13 @@ namespace Apache.Arrow
return true;
}
+ if (_memoryOwner == null &&
CArrowArrayExporter.EnableManagedMemoryExport)
+ {
+ var handle = _memory.Pin();
+ ptr = newOwner.Reference(handle);
+ return true;
+ }
+
ptr = IntPtr.Zero;
return false;
}
diff --git a/src/Apache.Arrow/C/CArrowArrayExporter.cs
b/src/Apache.Arrow/C/CArrowArrayExporter.cs
index 4bbf9e1..9e9baf3 100644
--- a/src/Apache.Arrow/C/CArrowArrayExporter.cs
+++ b/src/Apache.Arrow/C/CArrowArrayExporter.cs
@@ -26,6 +26,11 @@ namespace Apache.Arrow.C
{
public static class CArrowArrayExporter
{
+ /// <summary>
+ /// Experimental feature to enable exporting managed memory to
CArrowArray. Use with caution.
+ /// </summary>
+ public static bool EnableManagedMemoryExport = false;
+
#if NET5_0_OR_GREATER
private static unsafe delegate* unmanaged<CArrowArray*, void>
ReleaseArrayPtr => &ReleaseArray;
#else
diff --git a/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs
b/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs
index 516e7d7..cd52f6d 100644
--- a/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs
+++ b/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs
@@ -14,16 +14,17 @@
// limitations under the License.
using System;
+using System.Buffers;
using System.Collections.Generic;
-using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
namespace Apache.Arrow.Memory
{
- internal sealed class ExportedAllocationOwner : INativeAllocationOwner,
IDisposable
+ internal sealed class ExportedAllocationOwner : IDisposable
{
private readonly List<IntPtr> _pointers = new List<IntPtr>();
+ private readonly List<MemoryHandle> _handles = new
List<MemoryHandle>();
private long _allocationSize;
private long _referenceCount;
private bool _disposed;
@@ -46,9 +47,10 @@ namespace Apache.Arrow.Memory
return ptr;
}
- public void Release(IntPtr ptr, int offset, int length)
+ public unsafe IntPtr Reference(MemoryHandle handle)
{
- throw new InvalidOperationException();
+ _handles.Add(handle);
+ return new IntPtr(handle.Pointer);
}
public void IncRef()
@@ -70,6 +72,7 @@ namespace Apache.Arrow.Memory
{
return;
}
+
for (int i = 0; i < _pointers.Count; i++)
{
if (_pointers[i] != IntPtr.Zero)
@@ -78,6 +81,13 @@ namespace Apache.Arrow.Memory
_pointers[i] = IntPtr.Zero;
}
}
+
+ for (int i = 0; i < _handles.Count; i++)
+ {
+ _handles[i].Dispose();
+ _handles[i] = default;
+ }
+
GC.RemoveMemoryPressure(_allocationSize);
GC.SuppressFinalize(this);
_disposed = true;
diff --git a/src/Apache.Arrow/Memory/NativeMemoryManager.cs
b/src/Apache.Arrow/Memory/NativeMemoryManager.cs
index d42ee52..5fb51be 100644
--- a/src/Apache.Arrow/Memory/NativeMemoryManager.cs
+++ b/src/Apache.Arrow/Memory/NativeMemoryManager.cs
@@ -23,6 +23,7 @@ namespace Apache.Arrow.Memory
public class NativeMemoryManager : MemoryManager<byte>, IOwnableAllocation
{
private IntPtr _ptr;
+ private int _pinCount;
private readonly int _offset;
private readonly int _length;
private readonly INativeAllocationOwner _owner;
@@ -58,6 +59,7 @@ namespace Apache.Arrow.Memory
// NOTE: Unmanaged memory doesn't require GC pinning because by
definition it's not
// managed by the garbage collector.
+ Interlocked.Increment(ref _pinCount);
void* ptr = CalculatePointer(elementIndex);
return new MemoryHandle(ptr, default, this);
}
@@ -65,7 +67,7 @@ namespace Apache.Arrow.Memory
public override void Unpin()
{
// SEE: Pin implementation
- return;
+ Interlocked.Decrement(ref _pinCount);
}
protected override void Dispose(bool disposing)
@@ -74,6 +76,17 @@ namespace Apache.Arrow.Memory
IntPtr ptr = Interlocked.Exchange(ref _ptr, IntPtr.Zero);
if (ptr != IntPtr.Zero)
{
+ if (disposing)
+ {
+ // Only need to check for pinned data when disposing.
+ // If disposed from the finalizer, there can be no
MemoryHandles to this memory.
+ if (_pinCount > 0)
+ {
+ _ptr = ptr;
+ throw new InvalidOperationException("cannot free
native memory while it is pinned");
+ }
+ }
+
_owner.Release(ptr, _offset, _length);
}
}
diff --git a/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
b/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
index 44301f9..6dfb522 100644
--- a/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
+++ b/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
@@ -722,6 +722,64 @@ namespace Apache.Arrow.Tests
CArrowSchema.Free(cSchema);
}
+ [SkippableFact]
+ public unsafe void ExportManagedMemoryArray()
+ {
+ using (new EnableManagedExport())
+ {
+ var expectedValues = Enumerable.Range(0, 100).Select(i => i %
10 == 3 ? null : (long?)i).ToArray();
+ var gcRefs = new List<WeakReference>();
+
+ void TestExport()
+ {
+ var array = CreateManagedMemoryArray(expectedValues,
gcRefs);
+
+ dynamic pyArray;
+ using (Py.GIL())
+ {
+ dynamic pa = Py.Import("pyarrow");
+ pyArray = pa.array(expectedValues);
+ }
+
+ CArrowArray* cArray = CArrowArray.Create();
+ CArrowArrayExporter.ExportArray(array, cArray);
+
+ CArrowSchema* cSchema = CArrowSchema.Create();
+ CArrowSchemaExporter.ExportType(array.Data.DataType,
cSchema);
+
+ GcCollect();
+ foreach (var weakRef in gcRefs)
+ {
+ Assert.True(weakRef.IsAlive);
+ }
+
+ long arrayPtr = ((IntPtr)cArray).ToInt64();
+ long schemaPtr = ((IntPtr)cSchema).ToInt64();
+
+ using (Py.GIL())
+ {
+ dynamic pa = Py.Import("pyarrow");
+ dynamic exportedPyArray =
pa.Array._import_from_c(arrayPtr, schemaPtr);
+ Assert.True(exportedPyArray == pyArray);
+
+ // Required for the Python object to be garbage
collected:
+ exportedPyArray.Dispose();
+ }
+
+ CArrowArray.Free(cArray);
+ CArrowSchema.Free(cSchema);
+ }
+
+ TestExport();
+
+ GcCollect();
+ foreach (var weakRef in gcRefs)
+ {
+ Assert.False(weakRef.IsAlive);
+ }
+ }
+ }
+
[SkippableFact]
public unsafe void ExportBatch()
{
@@ -930,6 +988,95 @@ namespace Apache.Arrow.Tests
}
}
+ [SkippableFact]
+ public async Task ExportBatchReadFromIpc()
+ {
+ var originalBatch = GetTestRecordBatch();
+ dynamic pyBatch = GetPythonRecordBatch();
+
+ using (new EnableManagedExport())
+ using (var stream = new MemoryStream())
+ {
+ var writer = new ArrowStreamWriter(stream,
originalBatch.Schema);
+ await writer.WriteRecordBatchAsync(originalBatch);
+ await writer.WriteEndAsync();
+
+ stream.Seek(0, SeekOrigin.Begin);
+
+ var reader = new ArrowStreamReader(stream);
+ using var batch = await reader.ReadNextRecordBatchAsync();
+
+ Assert.NotNull(batch);
+ Assert.Equal(originalBatch.Length, batch.Length);
+
+ unsafe
+ {
+ CArrowArray* cArray = CArrowArray.Create();
+ CArrowArrayExporter.ExportRecordBatch(batch, cArray);
+
+ CArrowSchema* cSchema = CArrowSchema.Create();
+ CArrowSchemaExporter.ExportSchema(batch.Schema, cSchema);
+
+ long arrayPtr = ((IntPtr)cArray).ToInt64();
+ long schemaPtr = ((IntPtr)cSchema).ToInt64();
+
+ using (Py.GIL())
+ {
+ dynamic pa = Py.Import("pyarrow");
+ dynamic exportedPyArray =
pa.RecordBatch._import_from_c(arrayPtr, schemaPtr);
+ Assert.True(exportedPyArray == pyBatch);
+
+ // Dispose to unpin memory
+ exportedPyArray.Dispose();
+ }
+
+ CArrowArray.Free(cArray);
+ CArrowSchema.Free(cSchema);
+ }
+ }
+ }
+
+ [SkippableFact]
+ public async Task EarlyDisposeOfExportedBatch()
+ {
+ // Reading IPC data from a Stream creates Arrow buffers backed by
ReadOnlyMemory that point
+ // to slices of a single memory buffer owned by the RecordBatch
(unless compression is used).
+ // Using the exported data after the RecordBatch has been disposed
could cause
+ // memory corruption or access violations.
+
+ var originalBatch = GetTestRecordBatch();
+
+ using (new EnableManagedExport())
+ using (var stream = new MemoryStream())
+ {
+ var writer = new ArrowStreamWriter(stream,
originalBatch.Schema);
+ await writer.WriteRecordBatchAsync(originalBatch);
+ await writer.WriteEndAsync();
+
+ stream.Seek(0, SeekOrigin.Begin);
+
+ var reader = new ArrowStreamReader(stream);
+ using var batch = await reader.ReadNextRecordBatchAsync();
+
+ Assert.NotNull(batch);
+ Assert.Equal(originalBatch.Length, batch.Length);
+
+ unsafe
+ {
+ CArrowArray* cArray = CArrowArray.Create();
+ CArrowArrayExporter.ExportRecordBatch(batch, cArray);
+
+ CArrowSchema* cSchema = CArrowSchema.Create();
+ CArrowSchemaExporter.ExportSchema(batch.Schema, cSchema);
+
+ Assert.Throws<InvalidOperationException>(() =>
batch.Dispose());
+
+ CArrowArray.Free(cArray);
+ CArrowSchema.Free(cSchema);
+ }
+ }
+ }
+
private static PyObject List(params int?[] values)
{
return new PyList(values.Select(i => i == null ? PyObject.None :
new PyInt(i.Value)).ToArray());
@@ -960,6 +1107,34 @@ namespace Apache.Arrow.Tests
return new PyTuple(values.Select(i => i == null ? PyObject.None :
new PyInt(i.Value)).ToArray());
}
+ private static IArrowArray CreateManagedMemoryArray(long?[] values,
List<WeakReference> gcRefs)
+ {
+ var data = new byte[values.Length * sizeof(long)];
+ var validity = new byte[BitUtility.ByteCount(values.Length)];
+ var typedData = data.AsSpan().CastTo<long>();
+ var nullCount = 0;
+ for (var i = 0; i < values.Length; ++i)
+ {
+ BitUtility.SetBit(validity, i, values[i].HasValue);
+ typedData[i] = values[i].GetValueOrDefault(0);
+ nullCount += values[i].HasValue ? 0 : 1;
+ }
+
+ gcRefs.Add(new WeakReference(data));
+ gcRefs.Add(new WeakReference(validity));
+
+ return new Int64Array(new ArrowBuffer(data), new
ArrowBuffer(validity), values.Length, nullCount, 0);
+ }
+
+ private static void GcCollect()
+ {
+ for (int i = 0; i < 3; ++i)
+ {
+ GC.Collect();
+ GC.WaitForPendingFinalizers();
+ }
+ }
+
sealed class TestArrayStream : IArrowArrayStream
{
private readonly RecordBatch[] _batches;
@@ -986,5 +1161,21 @@ namespace Apache.Arrow.Tests
_index = -1;
}
}
+
+ sealed class EnableManagedExport : IDisposable
+ {
+ readonly bool _previousValue;
+
+ public EnableManagedExport()
+ {
+ _previousValue = CArrowArrayExporter.EnableManagedMemoryExport;
+ CArrowArrayExporter.EnableManagedMemoryExport = true;
+ }
+
+ public void Dispose()
+ {
+ CArrowArrayExporter.EnableManagedMemoryExport = _previousValue;
+ }
+ }
}
}