Copilot commented on code in PR #305: URL: https://github.com/apache/arrow-dotnet/pull/305#discussion_r3019490462
########## test/Apache.Arrow.Tests/CDeviceDataInterfacePythonTests.cs: ########## @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using Apache.Arrow.C; +using Apache.Arrow.Types; +using Python.Runtime; +using Xunit; + +namespace Apache.Arrow.Tests +{ + [Collection("PythonNet")] + public class CDeviceDataInterfacePythonTest + { + public CDeviceDataInterfacePythonTest(PythonNetFixture pythonNet) + { + pythonNet.EnsureInitialized(); + } + + private IArrowArray GetTestArray() + { + var builder = new StringArray.Builder(); + builder.Append("hello"); + builder.Append("world"); + builder.AppendNull(); + builder.Append("foo"); + builder.Append("bar"); + return builder.Build(); + } + + private dynamic GetPythonArray() + { + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + return pa.array(new[] { "hello", "world", null, "foo", "bar" }); + } + } + + private RecordBatch GetTestRecordBatch() + { + Field[] fields = new[] + { + new Field("col1", Int64Type.Default, true), + new Field("col2", StringType.Default, true), + new Field("col3", DoubleType.Default, true), + }; + return new RecordBatch( + new Schema(fields, null), + new IArrowArray[] + { + new Int64Array.Builder().AppendRange(new long[] { 1, 2, 3 }).AppendNull().Append(5).Build(), + GetTestArray(), + new DoubleArray.Builder().AppendRange(new double[] { 0.0, 1.4, 2.5, 3.6, 4.7 }).Build(), + }, + 5); + } + + private dynamic GetPythonRecordBatch() + { + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + dynamic table = pa.table( + new PyList(new PyObject[] + { + pa.array(new long?[] { 1, 2, 3, null, 5 }), + pa.array(new[] { "hello", "world", null, "foo", "bar" }), + pa.array(new[] { 0.0, 1.4, 2.5, 3.6, 4.7 }) + }), + new[] { "col1", "col2", "col3" }); + + return table.to_batches()[0]; + } + } + + [SkippableFact] + public unsafe void ExportArrayToDeviceAndImportInPython() + { + IArrowArray array = GetTestArray(); + dynamic pyArray = GetPythonArray(); + + CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create(); + CArrowDeviceArrayExporter.ExportArray(array, cDeviceArray); + + CArrowSchema* cSchema = CArrowSchema.Create(); + CArrowSchemaExporter.ExportType(array.Data.DataType, cSchema); + + long deviceArrayPtr = ((IntPtr)cDeviceArray).ToInt64(); + long schemaPtr = ((IntPtr)cSchema).ToInt64(); + + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + dynamic exportedPyArray = pa.Array._import_from_c_device(deviceArrayPtr, schemaPtr); + Assert.True(exportedPyArray == pyArray); + } + + CArrowDeviceArray.Free(cDeviceArray); + CArrowSchema.Free(cSchema); + } + + [SkippableFact] + public unsafe void ImportArrayFromPythonDevice() + { + CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create(); + CArrowSchema* cSchema = CArrowSchema.Create(); + Review Comment: `cSchema` is allocated via `CArrowSchema.Create()` but never freed in this test. Even though `ImportType(...)` calls the schema release callback, the unmanaged allocation from `Create()` still needs a matching `CArrowSchema.Free(cSchema)` to avoid leaking memory across the test suite. ########## test/Apache.Arrow.Tests/CDeviceDataInterfacePythonTests.cs: ########## @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using Apache.Arrow.C; +using Apache.Arrow.Types; +using Python.Runtime; +using Xunit; + +namespace Apache.Arrow.Tests +{ + [Collection("PythonNet")] + public class CDeviceDataInterfacePythonTest + { + public CDeviceDataInterfacePythonTest(PythonNetFixture pythonNet) + { + pythonNet.EnsureInitialized(); + } + + private IArrowArray GetTestArray() + { + var builder = new StringArray.Builder(); + builder.Append("hello"); + builder.Append("world"); + builder.AppendNull(); + builder.Append("foo"); + builder.Append("bar"); + return builder.Build(); + } + + private dynamic GetPythonArray() + { + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + return pa.array(new[] { "hello", "world", null, "foo", "bar" }); + } + } + + private RecordBatch GetTestRecordBatch() + { + Field[] fields = new[] + { + new Field("col1", Int64Type.Default, true), + new Field("col2", StringType.Default, true), + new Field("col3", DoubleType.Default, true), + }; + return new RecordBatch( + new Schema(fields, null), + new IArrowArray[] + { + new Int64Array.Builder().AppendRange(new long[] { 1, 2, 3 }).AppendNull().Append(5).Build(), + GetTestArray(), + new DoubleArray.Builder().AppendRange(new double[] { 0.0, 1.4, 2.5, 3.6, 4.7 }).Build(), + }, + 5); + } + + private dynamic GetPythonRecordBatch() + { + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + dynamic table = pa.table( + new PyList(new PyObject[] + { + pa.array(new long?[] { 1, 2, 3, null, 5 }), + pa.array(new[] { "hello", "world", null, "foo", "bar" }), + pa.array(new[] { 0.0, 1.4, 2.5, 3.6, 4.7 }) + }), + new[] { "col1", "col2", "col3" }); + + return table.to_batches()[0]; + } + } + + [SkippableFact] + public unsafe void ExportArrayToDeviceAndImportInPython() + { + IArrowArray array = GetTestArray(); + dynamic pyArray = GetPythonArray(); + + CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create(); + CArrowDeviceArrayExporter.ExportArray(array, cDeviceArray); + + CArrowSchema* cSchema = CArrowSchema.Create(); + CArrowSchemaExporter.ExportType(array.Data.DataType, cSchema); + + long deviceArrayPtr = ((IntPtr)cDeviceArray).ToInt64(); + long schemaPtr = ((IntPtr)cSchema).ToInt64(); + + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + dynamic exportedPyArray = pa.Array._import_from_c_device(deviceArrayPtr, schemaPtr); + Assert.True(exportedPyArray == pyArray); + } + + CArrowDeviceArray.Free(cDeviceArray); + CArrowSchema.Free(cSchema); + } + + [SkippableFact] + public unsafe void ImportArrayFromPythonDevice() + { + CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create(); + CArrowSchema* cSchema = CArrowSchema.Create(); + + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + dynamic pyArray = pa.array(new[] { "hello", "world", null, "foo", "bar" }); + + long deviceArrayPtr = ((IntPtr)cDeviceArray).ToInt64(); + long schemaPtr = ((IntPtr)cSchema).ToInt64(); + pyArray._export_to_c_device(deviceArrayPtr, schemaPtr); + } + + // Verify device fields set by PyArrow + Assert.Equal(ArrowDeviceType.Cpu, cDeviceArray->device_type); + Assert.Equal(-1, cDeviceArray->device_id); + Assert.True(cDeviceArray->sync_event == null); + + ArrowType type = CArrowSchemaImporter.ImportType(cSchema); + IArrowArray importedArray = CArrowDeviceArrayImporter.ImportArray(cDeviceArray, type); + StringArray importedStrings = (StringArray)importedArray; + + Assert.Equal(5, importedStrings.Length); + Assert.Equal("hello", importedStrings.GetString(0)); + Assert.Equal("world", importedStrings.GetString(1)); + Assert.Null(importedStrings.GetString(2)); + Assert.Equal("foo", importedStrings.GetString(3)); + Assert.Equal("bar", importedStrings.GetString(4)); Review Comment: `ImportArray(...)` returns an `IArrowArray` that should be disposed to ensure the underlying C release callback runs deterministically. Right now `importedArray` is never disposed, which can leak/pin buffers until finalization. ```suggestion using (IArrowArray importedArray = CArrowDeviceArrayImporter.ImportArray(cDeviceArray, type)) { StringArray importedStrings = (StringArray)importedArray; Assert.Equal(5, importedStrings.Length); Assert.Equal("hello", importedStrings.GetString(0)); Assert.Equal("world", importedStrings.GetString(1)); Assert.Null(importedStrings.GetString(2)); Assert.Equal("foo", importedStrings.GetString(3)); Assert.Equal("bar", importedStrings.GetString(4)); } ``` ########## test/Apache.Arrow.Tests/CDeviceDataInterfacePythonTests.cs: ########## @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using Apache.Arrow.C; +using Apache.Arrow.Types; +using Python.Runtime; +using Xunit; + +namespace Apache.Arrow.Tests +{ + [Collection("PythonNet")] + public class CDeviceDataInterfacePythonTest + { + public CDeviceDataInterfacePythonTest(PythonNetFixture pythonNet) + { + pythonNet.EnsureInitialized(); + } + + private IArrowArray GetTestArray() + { + var builder = new StringArray.Builder(); + builder.Append("hello"); + builder.Append("world"); + builder.AppendNull(); + builder.Append("foo"); + builder.Append("bar"); + return builder.Build(); + } + + private dynamic GetPythonArray() + { + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + return pa.array(new[] { "hello", "world", null, "foo", "bar" }); + } + } + + private RecordBatch GetTestRecordBatch() + { + Field[] fields = new[] + { + new Field("col1", Int64Type.Default, true), + new Field("col2", StringType.Default, true), + new Field("col3", DoubleType.Default, true), + }; + return new RecordBatch( + new Schema(fields, null), + new IArrowArray[] + { + new Int64Array.Builder().AppendRange(new long[] { 1, 2, 3 }).AppendNull().Append(5).Build(), + GetTestArray(), + new DoubleArray.Builder().AppendRange(new double[] { 0.0, 1.4, 2.5, 3.6, 4.7 }).Build(), + }, + 5); + } + + private dynamic GetPythonRecordBatch() + { + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + dynamic table = pa.table( + new PyList(new PyObject[] + { + pa.array(new long?[] { 1, 2, 3, null, 5 }), + pa.array(new[] { "hello", "world", null, "foo", "bar" }), + pa.array(new[] { 0.0, 1.4, 2.5, 3.6, 4.7 }) + }), + new[] { "col1", "col2", "col3" }); + + return table.to_batches()[0]; + } + } + + [SkippableFact] + public unsafe void ExportArrayToDeviceAndImportInPython() + { + IArrowArray array = GetTestArray(); + dynamic pyArray = GetPythonArray(); + + CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create(); + CArrowDeviceArrayExporter.ExportArray(array, cDeviceArray); + + CArrowSchema* cSchema = CArrowSchema.Create(); + CArrowSchemaExporter.ExportType(array.Data.DataType, cSchema); + + long deviceArrayPtr = ((IntPtr)cDeviceArray).ToInt64(); + long schemaPtr = ((IntPtr)cSchema).ToInt64(); + + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + dynamic exportedPyArray = pa.Array._import_from_c_device(deviceArrayPtr, schemaPtr); + Assert.True(exportedPyArray == pyArray); + } + + CArrowDeviceArray.Free(cDeviceArray); + CArrowSchema.Free(cSchema); + } + + [SkippableFact] + public unsafe void ImportArrayFromPythonDevice() + { + CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create(); + CArrowSchema* cSchema = CArrowSchema.Create(); + + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + dynamic pyArray = pa.array(new[] { "hello", "world", null, "foo", "bar" }); + + long deviceArrayPtr = ((IntPtr)cDeviceArray).ToInt64(); + long schemaPtr = ((IntPtr)cSchema).ToInt64(); + pyArray._export_to_c_device(deviceArrayPtr, schemaPtr); + } + + // Verify device fields set by PyArrow + Assert.Equal(ArrowDeviceType.Cpu, cDeviceArray->device_type); + Assert.Equal(-1, cDeviceArray->device_id); + Assert.True(cDeviceArray->sync_event == null); + + ArrowType type = CArrowSchemaImporter.ImportType(cSchema); + IArrowArray importedArray = CArrowDeviceArrayImporter.ImportArray(cDeviceArray, type); + StringArray importedStrings = (StringArray)importedArray; + + Assert.Equal(5, importedStrings.Length); + Assert.Equal("hello", importedStrings.GetString(0)); + Assert.Equal("world", importedStrings.GetString(1)); + Assert.Null(importedStrings.GetString(2)); + Assert.Equal("foo", importedStrings.GetString(3)); + Assert.Equal("bar", importedStrings.GetString(4)); + + CArrowDeviceArray.Free(cDeviceArray); + } + + [SkippableFact] + public unsafe void ExportRecordBatchToDeviceAndImportInPython() + { + RecordBatch batch = GetTestRecordBatch(); + dynamic pyBatch = GetPythonRecordBatch(); + + CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create(); + CArrowDeviceArrayExporter.ExportRecordBatch(batch, cDeviceArray); + + CArrowSchema* cSchema = CArrowSchema.Create(); + CArrowSchemaExporter.ExportSchema(batch.Schema, cSchema); + + long deviceArrayPtr = ((IntPtr)cDeviceArray).ToInt64(); + long schemaPtr = ((IntPtr)cSchema).ToInt64(); + + using (Py.GIL()) + { + dynamic pa = Py.Import("pyarrow"); + dynamic exportedPyBatch = pa.RecordBatch._import_from_c_device(deviceArrayPtr, schemaPtr); + Assert.True(exportedPyBatch == pyBatch); + } + + CArrowDeviceArray.Free(cDeviceArray); + CArrowSchema.Free(cSchema); + } + + [SkippableFact] + public unsafe void ImportRecordBatchFromPythonDevice() + { + CArrowDeviceArray* cDeviceArray = CArrowDeviceArray.Create(); + CArrowSchema* cSchema = CArrowSchema.Create(); + Review Comment: `cSchema` is allocated via `CArrowSchema.Create()` but never freed in this test. Please add `CArrowSchema.Free(cSchema)` once you're done importing the schema to avoid leaking the unmanaged allocation. ########## src/Apache.Arrow/C/CArrowDeviceArrayStreamExporter.cs: ########## @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Runtime.InteropServices; +using Apache.Arrow.Ipc; + +namespace Apache.Arrow.C +{ + public static class CArrowDeviceArrayStreamExporter + { +#if NET5_0_OR_GREATER + private static unsafe delegate* unmanaged<CArrowDeviceArrayStream*, CArrowSchema*, int> GetSchemaPtr => &GetSchema; + private static unsafe delegate* unmanaged<CArrowDeviceArrayStream*, CArrowDeviceArray*, int> GetNextPtr => &GetNext; + private static unsafe delegate* unmanaged<CArrowDeviceArrayStream*, byte*> GetLastErrorPtr => &GetLastError; + private static unsafe delegate* unmanaged<CArrowDeviceArrayStream*, void> ReleasePtr => &Release; +#else + internal unsafe delegate int GetSchemaDeviceArrayStream(CArrowDeviceArrayStream* cDeviceArrayStream, CArrowSchema* cSchema); + private static unsafe NativeDelegate<GetSchemaDeviceArrayStream> s_getSchema = new NativeDelegate<GetSchemaDeviceArrayStream>(GetSchema); + private static unsafe IntPtr GetSchemaPtr => s_getSchema.Pointer; + internal unsafe delegate int GetNextDeviceArrayStream(CArrowDeviceArrayStream* cDeviceArrayStream, CArrowDeviceArray* cDeviceArray); + private static unsafe NativeDelegate<GetNextDeviceArrayStream> s_getNext = new NativeDelegate<GetNextDeviceArrayStream>(GetNext); + private static unsafe IntPtr GetNextPtr => s_getNext.Pointer; + internal unsafe delegate byte* GetLastErrorDeviceArrayStream(CArrowDeviceArrayStream* cDeviceArrayStream); + private static unsafe NativeDelegate<GetLastErrorDeviceArrayStream> s_getLastError = new NativeDelegate<GetLastErrorDeviceArrayStream>(GetLastError); + private static unsafe IntPtr GetLastErrorPtr => s_getLastError.Pointer; + internal unsafe delegate void ReleaseDeviceArrayStream(CArrowDeviceArrayStream* cDeviceArrayStream); + private static unsafe NativeDelegate<ReleaseDeviceArrayStream> s_release = new NativeDelegate<ReleaseDeviceArrayStream>(Release); + private static unsafe IntPtr ReleasePtr => s_release.Pointer; +#endif + + /// <summary> + /// Export an <see cref="IArrowArrayStream"/> to a <see cref="CArrowDeviceArrayStream"/>. + /// </summary> + /// <param name="arrayStream">The array stream to export</param> + /// <param name="deviceArrayStream">An allocated but uninitialized CArrowDeviceArrayStream pointer.</param> + /// <example> + /// <code> + /// CArrowDeviceArrayStream* exportPtr = CArrowDeviceArrayStream.Create(); + /// CArrowDeviceArrayStreamExporter.ExportArrayStream(arrayStream, exportPtr); + /// foreign_import_function(exportPtr); + /// </code> + /// </example> + public static unsafe void ExportArrayStream(IArrowArrayStream arrayStream, CArrowDeviceArrayStream* deviceArrayStream) + { + if (arrayStream == null) + { + throw new ArgumentNullException(nameof(arrayStream)); + } + if (deviceArrayStream == null) + { + throw new ArgumentNullException(nameof(deviceArrayStream)); + } + + deviceArrayStream->device_type = ArrowDeviceType.Cpu; + deviceArrayStream->private_data = ExportedDeviceArrayStream.Export(arrayStream); + deviceArrayStream->get_schema = GetSchemaPtr; + deviceArrayStream->get_next = GetNextPtr; + deviceArrayStream->get_last_error = GetLastErrorPtr; + deviceArrayStream->release = ReleasePtr; + } + +#if NET5_0_OR_GREATER + [UnmanagedCallersOnly] +#endif + private unsafe static int GetSchema(CArrowDeviceArrayStream* cDeviceArrayStream, CArrowSchema* cSchema) + { + ExportedDeviceArrayStream stream = null; + try + { + stream = ExportedDeviceArrayStream.FromPointer(cDeviceArrayStream->private_data); + CArrowSchemaExporter.ExportSchema(stream.ArrowArrayStream.Schema, cSchema); + return stream.ClearError(); + } + catch (Exception ex) + { + return stream?.SetError(ex) ?? ExportedDeviceArrayStream.EOTHER; + } + } + +#if NET5_0_OR_GREATER + [UnmanagedCallersOnly] +#endif + private unsafe static int GetNext(CArrowDeviceArrayStream* cDeviceArrayStream, CArrowDeviceArray* cDeviceArray) + { + ExportedDeviceArrayStream stream = null; + try + { + cDeviceArray->array.release = default; + stream = ExportedDeviceArrayStream.FromPointer(cDeviceArrayStream->private_data); + RecordBatch recordBatch = stream.ArrowArrayStream.ReadNextRecordBatchAsync().Result; + if (recordBatch != null) + { + CArrowDeviceArrayExporter.ExportRecordBatch(recordBatch, cDeviceArray); Review Comment: `ReadNextRecordBatchAsync().Result` wraps exceptions in `AggregateException` and makes failures harder to debug. Also, `RecordBatch` is disposable and should be disposed after exporting (the exporter keeps buffers alive via ref counting). Consider `GetAwaiter().GetResult()` plus disposing the batch in a `using`/`finally`. ```suggestion RecordBatch recordBatch = stream.ArrowArrayStream.ReadNextRecordBatchAsync().GetAwaiter().GetResult(); if (recordBatch != null) { try { CArrowDeviceArrayExporter.ExportRecordBatch(recordBatch, cDeviceArray); } finally { recordBatch.Dispose(); } ``` ########## test/Apache.Arrow.Tests/PythonNetFixture.cs: ########## @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Runtime.InteropServices; +using Python.Runtime; +using Xunit; + +namespace Apache.Arrow.Tests +{ + /// <summary> + /// Shared fixture for Python.NET initialization. Python.NET can only be initialized + /// once per process, so this fixture is shared across all test classes via + /// <see cref="PythonNetCollection"/>. + /// </summary> + public class PythonNetFixture : IDisposable + { + public bool Initialized { get; } + + public bool VersionMismatch { get; } + + public PythonNetFixture() + { + bool pythonSet = Environment.GetEnvironmentVariable("PYTHONNET_PYDLL") != null; + if (!pythonSet) + { + Initialized = false; + return; + } + + try + { + PythonEngine.Initialize(); + } + catch (NotSupportedException e) when (e.Message.Contains("Python ABI ") && e.Message.Contains("not supported")) + { + // An unsupported version of Python is being used + Initialized = false; + VersionMismatch = true; + return; + } + + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows) && + PythonEngine.PythonPath.IndexOf("dlls", StringComparison.OrdinalIgnoreCase) < 0) + { + dynamic sys = Py.Import("sys"); + sys.path.append(Path.Combine(Path.GetDirectoryName(Environment.GetEnvironmentVariable("PYTHONNET_PYDLL")), "DLLs")); Review Comment: On Windows this modifies `sys.path` via `Py.Import("sys")` without explicitly acquiring the GIL. To avoid intermittent "must hold the GIL" failures, wrap this block in `using (Py.GIL()) { ... }`. ```suggestion using (Py.GIL()) { dynamic sys = Py.Import("sys"); sys.path.append(Path.Combine(Path.GetDirectoryName(Environment.GetEnvironmentVariable("PYTHONNET_PYDLL")), "DLLs")); } ``` ########## src/Apache.Arrow/C/CArrowDeviceArrayStreamImporter.cs: ########## @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Apache.Arrow.Ipc; + +namespace Apache.Arrow.C +{ + public static class CArrowDeviceArrayStreamImporter + { + /// <summary> + /// Import C pointer as an <see cref="IArrowArrayStream"/>. + /// </summary> + /// <remarks> + /// This will call the release callback on the passed struct if the function fails. + /// Otherwise, the release callback is called when the IArrowArrayStream is disposed. + /// Only CPU device streams are supported. + /// </remarks> + /// <param name="ptr">The pointer to the device array stream being imported</param> + /// <returns>The imported C# array stream</returns> + public static unsafe IArrowArrayStream ImportDeviceArrayStream(CArrowDeviceArrayStream* ptr) + { + if (ptr == null) + { + throw new ArgumentNullException(nameof(ptr)); + } + if (ptr->device_type != ArrowDeviceType.Cpu) + { + throw new NotSupportedException( + $"Importing device array streams from device type {ptr->device_type} is not supported. Only CPU streams can be imported."); + } + + return new ImportedArrowDeviceArrayStream(ptr); + } + + private sealed unsafe class ImportedArrowDeviceArrayStream : IArrowArrayStream + { + private readonly CArrowDeviceArrayStream _cDeviceArrayStream; + private readonly Schema _schema; + private bool _disposed; + + internal static string GetLastError(CArrowDeviceArrayStream* stream, int errno) + { +#if NET5_0_OR_GREATER + byte* error = stream->get_last_error(stream); +#else + byte* error = Marshal.GetDelegateForFunctionPointer<CArrowDeviceArrayStreamExporter.GetLastErrorDeviceArrayStream>(stream->get_last_error)(stream); +#endif + if (error == null) + { + return $"Device array stream operation failed with no message. Error code: {errno}"; + } + return StringUtil.PtrToStringUtf8(error); + } + + public ImportedArrowDeviceArrayStream(CArrowDeviceArrayStream* cDeviceArrayStream) + { + if (cDeviceArrayStream == null) + { + throw new ArgumentNullException(nameof(cDeviceArrayStream)); + } + if (cDeviceArrayStream->release == default) + { + throw new ArgumentException("Tried to import a device array stream that has already been released.", nameof(cDeviceArrayStream)); + } Review Comment: Only `release` is validated, but `get_schema`/`get_next` are also required callbacks. If either is null/default, the subsequent unmanaged call will crash. Please validate these pointers up-front and throw a clear `ArgumentException` if missing. ```suggestion } if (cDeviceArrayStream->get_schema == default) { throw new ArgumentException("Tried to import a device array stream that is missing the required 'get_schema' callback.", nameof(cDeviceArrayStream)); } if (cDeviceArrayStream->get_next == default) { throw new ArgumentException("Tried to import a device array stream that is missing the required 'get_next' callback.", nameof(cDeviceArrayStream)); } ``` ########## src/Apache.Arrow/C/CArrowDeviceArrayStreamImporter.cs: ########## @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Apache.Arrow.Ipc; + +namespace Apache.Arrow.C +{ + public static class CArrowDeviceArrayStreamImporter + { + /// <summary> + /// Import C pointer as an <see cref="IArrowArrayStream"/>. + /// </summary> + /// <remarks> + /// This will call the release callback on the passed struct if the function fails. + /// Otherwise, the release callback is called when the IArrowArrayStream is disposed. + /// Only CPU device streams are supported. + /// </remarks> + /// <param name="ptr">The pointer to the device array stream being imported</param> + /// <returns>The imported C# array stream</returns> + public static unsafe IArrowArrayStream ImportDeviceArrayStream(CArrowDeviceArrayStream* ptr) + { + if (ptr == null) + { + throw new ArgumentNullException(nameof(ptr)); + } + if (ptr->device_type != ArrowDeviceType.Cpu) + { + throw new NotSupportedException( + $"Importing device array streams from device type {ptr->device_type} is not supported. Only CPU streams can be imported."); + } + + return new ImportedArrowDeviceArrayStream(ptr); + } + + private sealed unsafe class ImportedArrowDeviceArrayStream : IArrowArrayStream + { + private readonly CArrowDeviceArrayStream _cDeviceArrayStream; + private readonly Schema _schema; + private bool _disposed; + + internal static string GetLastError(CArrowDeviceArrayStream* stream, int errno) + { +#if NET5_0_OR_GREATER + byte* error = stream->get_last_error(stream); +#else Review Comment: `get_last_error` is documented as optional, but `GetLastError` unconditionally calls it. If `get_last_error` is null/default, this will AV (NET5 function pointer) or throw (pre-NET5). Please guard for `stream->get_last_error == default` and fall back to a generic message including `errno`. ```suggestion #if NET5_0_OR_GREATER if (stream->get_last_error == default) { return $"Device array stream operation failed. Error code: {errno}"; } byte* error = stream->get_last_error(stream); #else if (stream->get_last_error == default) { return $"Device array stream operation failed. Error code: {errno}"; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
