westonpace commented on code in PR #35496:
URL: https://github.com/apache/arrow/pull/35496#discussion_r1196750655


##########
csharp/src/Apache.Arrow/Arrays/NullArray.cs:
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using Apache.Arrow.Types;
+using Apache.Arrow.Memory;
+
+namespace Apache.Arrow
+{
+    public class NullArray : IArrowArray
+    {
+        public class Builder : IArrowArrayBuilder<NullArray, Builder>
+        {
+            private int _length;
+
+            public int Length => _length;
+            public int Capacity => _length;
+            public int NullCount => _length;
+
+            public Builder()
+            {
+            }
+
+            public Builder AppendNull()
+            {
+                _length++;
+                return this;
+            }
+
+            public NullArray Build(MemoryAllocator allocator = default)
+            {
+                return new NullArray(_length);
+            }
+
+            public Builder Clear()
+            {
+                _length = 0;
+                return this;
+            }
+
+            public Builder Reserve(int capacity)
+            {
+                if (capacity < 0)
+                {
+                    throw new ArgumentOutOfRangeException(nameof(capacity));
+                }
+
+                return this;
+            }
+
+            public Builder Resize(int length)
+            {
+                if (length < 0)
+                {
+                    throw new ArgumentOutOfRangeException(nameof(length));
+                }
+
+                _length = length;
+                return this;
+            }
+
+            private void CheckIndex(int index)
+            {
+                if (index < 0 || index >= Length)
+                {
+                    throw new ArgumentOutOfRangeException(nameof(index));
+                }
+            }
+        }
+
+        public ArrayData Data { get; }
+
+        public NullArray(ArrayData data)
+        {
+            data.EnsureDataType(ArrowTypeId.Null);
+            data.EnsureBufferCount(0);
+            Data = data;

Review Comment:
   Minor nit: should we ensure the null count and length are equal?



##########
csharp/src/Apache.Arrow/C/CArrowArrayStreamImporter.cs:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+
+namespace Apache.Arrow.C
+{
+    public static class CArrowArrayStreamImporter
+    {
+        /// <summary>
+        /// Import C pointer as an <see cref="IArrowArrayStream"/>.
+        /// </summary>
+        /// <remarks>
+        /// This will call the release callback on the passed struct if the 
function fails.
+        /// Otherwise, the release callback is called when the 
IArrowArrayStream is disposed.
+        /// </remarks>
+        /// <examples>
+        /// Typically, you will allocate an uninitialized CArrowArrayStream 
pointer,
+        /// pass that to external function, and then use this method to import
+        /// the result.
+        /// 
+        /// <code>
+        /// CArrowArrayStream* importedPtr = CArrowArrayStream.Create();
+        /// foreign_export_function(importedPtr);
+        /// IArrowArrayStream importedStream = 
CArrowArrayStreamImporter.ImportStream(importedPtr);
+        /// </code>
+        /// </examples>
+        public static unsafe IArrowArrayStream 
ImportArrayStream(CArrowArrayStream* ptr)
+        {
+            return new ImportedArrowArrayStream(ptr);
+        }
+
+        private sealed unsafe class ImportedArrowArrayStream : 
IArrowArrayStream
+        {
+            private readonly CArrowArrayStream* _cArrayStream;
+            private readonly Schema _schema;
+            private bool _disposed;
+
+            public ImportedArrowArrayStream(CArrowArrayStream* cArrayStream)
+            {
+                if (cArrayStream == null)
+                {
+                    throw new ArgumentNullException(nameof(cArrayStream));
+                }
+                _cArrayStream = cArrayStream;
+                if (_cArrayStream->release == null)
+                {
+                    throw new ArgumentException("Tried to import an array 
stream that has already been released.", nameof(cArrayStream));
+                }
+
+                CArrowSchema* cSchema = CArrowSchema.Create();
+                try
+                {
+                    if (_cArrayStream->get_schema(_cArrayStream, cSchema) != 0)
+                    {
+                        throw new Exception("This needs to be better");
+                    }
+                    _schema = CArrowSchemaImporter.ImportSchema(cSchema);
+                }
+                finally
+                {
+                    if (_schema == null)
+                    {
+                        CArrowSchema.Free(cSchema);
+                    }
+                }
+            }
+
+            ~ImportedArrowArrayStream()
+            {
+                Dispose();
+            }
+
+            public Schema Schema => _schema;
+
+            public ValueTask<RecordBatch> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+            {
+                if (_disposed)
+                {
+                    throw new 
ObjectDisposedException(typeof(ImportedArrowArrayStream).Name);
+                }
+
+                RecordBatch result = null;
+                CArrowArray* cArray = CArrowArray.Create();
+                try
+                {
+                    if (_cArrayStream->get_next(_cArrayStream, cArray) != 0)
+                    {
+                        throw new Exception("This too needs to be better");
+                    }
+                    if (cArray->release != null)
+                    {
+                        result = CArrowArrayImporter.ImportRecordBatch(cArray, 
_schema);
+                    }
+                }
+                finally
+                {
+                    if (result == null)
+                    {
+                        CArrowArray.Free(cArray);
+                    }
+                }
+
+                return new ValueTask<RecordBatch>(result);

Review Comment:
   You could, potentially, accept a task scheduler and schedule a new task to 
call `get_next`, allowing this to be more accurately async.  Though that should 
definitely be a follow-up and probably depends on what you're interfacing with 
(e.g. is the underlying stream performing I/O and slow?)



##########
csharp/src/Apache.Arrow/Arrays/NullArray.cs:
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using Apache.Arrow.Types;
+using Apache.Arrow.Memory;
+
+namespace Apache.Arrow
+{
+    public class NullArray : IArrowArray
+    {
+        public class Builder : IArrowArrayBuilder<NullArray, Builder>
+        {
+            private int _length;
+
+            public int Length => _length;
+            public int Capacity => _length;
+            public int NullCount => _length;
+
+            public Builder()
+            {
+            }
+
+            public Builder AppendNull()
+            {
+                _length++;
+                return this;
+            }
+
+            public NullArray Build(MemoryAllocator allocator = default)
+            {
+                return new NullArray(_length);
+            }
+
+            public Builder Clear()
+            {
+                _length = 0;
+                return this;
+            }
+
+            public Builder Reserve(int capacity)
+            {
+                if (capacity < 0)
+                {
+                    throw new ArgumentOutOfRangeException(nameof(capacity));
+                }
+
+                return this;
+            }
+
+            public Builder Resize(int length)
+            {
+                if (length < 0)
+                {
+                    throw new ArgumentOutOfRangeException(nameof(length));
+                }
+
+                _length = length;
+                return this;
+            }
+
+            private void CheckIndex(int index)
+            {
+                if (index < 0 || index >= Length)
+                {
+                    throw new ArgumentOutOfRangeException(nameof(index));
+                }
+            }

Review Comment:
   Is this function used?



##########
csharp/src/Apache.Arrow/C/CArrowArrayImporter.cs:
##########
@@ -0,0 +1,277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using Apache.Arrow.Memory;
+using Apache.Arrow.Types;
+
+namespace Apache.Arrow.C
+{
+    public static class CArrowArrayImporter
+    {
+        /// <summary>
+        /// Import C pointer as an <see cref="IArrowArray"/>.
+        /// </summary>
+        /// <remarks>
+        /// This will call the release callback once all of the buffers in the 
returned
+        /// IArrowArray are disposed.
+        /// </remarks>
+        /// <examples>
+        /// Typically, you will allocate an uninitialized CArrowArray pointer,
+        /// pass that to external function, and then use this method to import
+        /// the result.
+        /// 
+        /// <code>
+        /// CArrowArray* importedPtr = CArrowArray.Create();
+        /// foreign_export_function(importedPtr);
+        /// IArrowArray importedArray = 
CArrowArrayImporter.ImportArray(importedPtr);
+        /// </code>
+        /// </examples>
+        public static unsafe IArrowArray ImportArray(CArrowArray* ptr, 
IArrowType type)
+        {
+            ImportedArrowArray importedArray = null;
+            try
+            {
+                importedArray = new ImportedArrowArray(ptr);
+                return importedArray.GetAsArray(type);
+            }
+            finally
+            {
+                importedArray?.Release();
+            }
+        }
+
+        /// <summary>
+        /// Import C pointer as a <see cref="RecordBatch"/>.
+        /// </summary>
+        /// <remarks>
+        /// This will call the release callback once all of the buffers in the 
returned
+        /// RecordBatch are disposed.
+        /// </remarks>
+        /// <examples>
+        /// Typically, you will allocate an uninitialized CArrowArray pointer,
+        /// pass that to external function, and then use this method to import
+        /// the result.
+        /// 
+        /// <code>
+        /// CArrowArray* importedPtr = CArrowArray.Create();
+        /// foreign_export_function(importedPtr);
+        /// RecordBatch batch = 
CArrowArrayImporter.ImportRecordBatch(importedPtr, schema);
+        /// </code>
+        /// </examples>
+        public static unsafe RecordBatch ImportRecordBatch(CArrowArray* ptr, 
Schema schema)
+        {
+            ImportedArrowArray importedArray = null;
+            try
+            {
+                importedArray = new ImportedArrowArray(ptr);
+                return importedArray.GetAsRecordBatch(schema);
+            }
+            finally
+            {
+                importedArray?.Release();
+            }
+        }
+
+        private sealed unsafe class ImportedArrowArray : 
ImportedAllocationOwner
+        {
+            private readonly CArrowArray* _cArray;
+
+            public ImportedArrowArray(CArrowArray* cArray)
+            {
+                if (cArray == null)
+                {
+                    throw new ArgumentNullException(nameof(cArray));
+                }
+                _cArray = cArray;
+                if (_cArray->release == null)
+                {
+                    throw new ArgumentException("Tried to import an array that 
has already been released.", nameof(cArray));
+                }
+            }
+
+            protected override void FinalRelease()
+            {
+                if (_cArray->release != null)
+                {
+                    _cArray->release(_cArray);

Review Comment:
   Do you need to free `_cArray`?
   
   Nevermind.  I see that `_cArray` comes from the caller of `ImportArray` and 
they are responsible for deleting it (which it seems they can always do as soon 
as the call to `ImportArray` is finished).



##########
csharp/src/Apache.Arrow/C/CArrowArrayStreamImporter.cs:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+
+namespace Apache.Arrow.C
+{
+    public static class CArrowArrayStreamImporter
+    {
+        /// <summary>
+        /// Import C pointer as an <see cref="IArrowArrayStream"/>.
+        /// </summary>
+        /// <remarks>
+        /// This will call the release callback on the passed struct if the 
function fails.
+        /// Otherwise, the release callback is called when the 
IArrowArrayStream is disposed.
+        /// </remarks>
+        /// <examples>
+        /// Typically, you will allocate an uninitialized CArrowArrayStream 
pointer,
+        /// pass that to external function, and then use this method to import
+        /// the result.
+        /// 
+        /// <code>
+        /// CArrowArrayStream* importedPtr = CArrowArrayStream.Create();
+        /// foreign_export_function(importedPtr);
+        /// IArrowArrayStream importedStream = 
CArrowArrayStreamImporter.ImportStream(importedPtr);
+        /// </code>
+        /// </examples>
+        public static unsafe IArrowArrayStream 
ImportArrayStream(CArrowArrayStream* ptr)
+        {
+            return new ImportedArrowArrayStream(ptr);
+        }
+
+        private sealed unsafe class ImportedArrowArrayStream : 
IArrowArrayStream
+        {
+            private readonly CArrowArrayStream* _cArrayStream;
+            private readonly Schema _schema;
+            private bool _disposed;
+
+            public ImportedArrowArrayStream(CArrowArrayStream* cArrayStream)
+            {
+                if (cArrayStream == null)
+                {
+                    throw new ArgumentNullException(nameof(cArrayStream));
+                }
+                _cArrayStream = cArrayStream;
+                if (_cArrayStream->release == null)
+                {
+                    throw new ArgumentException("Tried to import an array 
stream that has already been released.", nameof(cArrayStream));
+                }
+
+                CArrowSchema* cSchema = CArrowSchema.Create();
+                try
+                {
+                    if (_cArrayStream->get_schema(_cArrayStream, cSchema) != 0)
+                    {
+                        throw new Exception("This needs to be better");
+                    }
+                    _schema = CArrowSchemaImporter.ImportSchema(cSchema);
+                }
+                finally
+                {
+                    if (_schema == null)
+                    {
+                        CArrowSchema.Free(cSchema);
+                    }
+                }
+            }
+
+            ~ImportedArrowArrayStream()
+            {
+                Dispose();
+            }
+
+            public Schema Schema => _schema;
+
+            public ValueTask<RecordBatch> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+            {
+                if (_disposed)
+                {
+                    throw new 
ObjectDisposedException(typeof(ImportedArrowArrayStream).Name);
+                }
+
+                RecordBatch result = null;
+                CArrowArray* cArray = CArrowArray.Create();
+                try
+                {
+                    if (_cArrayStream->get_next(_cArrayStream, cArray) != 0)
+                    {
+                        throw new Exception("This too needs to be better");

Review Comment:
   Same as above.  Let's include the error number.



##########
csharp/src/Apache.Arrow/Memory/ImportedAllocationOwner.cs:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Buffers;
+using System.Threading;
+
+namespace Apache.Arrow.Memory
+{
+    internal abstract class ImportedAllocationOwner : INativeAllocationOwner
+    {
+        long _referenceCount;
+        long _managedMemory;
+
+        protected ImportedAllocationOwner()
+        {
+            _referenceCount = 1;
+        }
+
+        public IMemoryOwner<byte> AddMemory(IntPtr ptr, int offset, int length)
+        {
+            if (_referenceCount <= 0)
+            {
+                throw new 
ObjectDisposedException(typeof(ImportedAllocationOwner).Name);
+            }
+
+            NativeMemoryManager memory = new NativeMemoryManager(this, ptr, 
offset, length);
+            Interlocked.Increment(ref _referenceCount);
+            Interlocked.Add(ref _managedMemory, length);

Review Comment:
   Minor nit: you could probably move this statement inside the if right below 
it.



##########
csharp/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs:
##########
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+
+namespace Apache.Arrow.Memory
+{
+    internal sealed class ExportedAllocationOwner : INativeAllocationOwner, 
IDisposable
+    {
+        private readonly List<IntPtr> _pointers = new List<IntPtr>();
+        private int _allocationSize;

Review Comment:
   Is this used for anything?



##########
csharp/src/Apache.Arrow/C/CArrowArrayStreamImporter.cs:
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+
+namespace Apache.Arrow.C
+{
+    public static class CArrowArrayStreamImporter
+    {
+        /// <summary>
+        /// Import C pointer as an <see cref="IArrowArrayStream"/>.
+        /// </summary>
+        /// <remarks>
+        /// This will call the release callback on the passed struct if the 
function fails.
+        /// Otherwise, the release callback is called when the 
IArrowArrayStream is disposed.
+        /// </remarks>
+        /// <examples>
+        /// Typically, you will allocate an uninitialized CArrowArrayStream 
pointer,
+        /// pass that to external function, and then use this method to import
+        /// the result.
+        /// 
+        /// <code>
+        /// CArrowArrayStream* importedPtr = CArrowArrayStream.Create();
+        /// foreign_export_function(importedPtr);
+        /// IArrowArrayStream importedStream = 
CArrowArrayStreamImporter.ImportStream(importedPtr);
+        /// </code>
+        /// </examples>
+        public static unsafe IArrowArrayStream 
ImportArrayStream(CArrowArrayStream* ptr)
+        {
+            return new ImportedArrowArrayStream(ptr);
+        }
+
+        private sealed unsafe class ImportedArrowArrayStream : 
IArrowArrayStream
+        {
+            private readonly CArrowArrayStream* _cArrayStream;
+            private readonly Schema _schema;
+            private bool _disposed;
+
+            public ImportedArrowArrayStream(CArrowArrayStream* cArrayStream)
+            {
+                if (cArrayStream == null)
+                {
+                    throw new ArgumentNullException(nameof(cArrayStream));
+                }
+                _cArrayStream = cArrayStream;
+                if (_cArrayStream->release == null)
+                {
+                    throw new ArgumentException("Tried to import an array 
stream that has already been released.", nameof(cArrayStream));
+                }
+
+                CArrowSchema* cSchema = CArrowSchema.Create();
+                try
+                {
+                    if (_cArrayStream->get_schema(_cArrayStream, cSchema) != 0)
+                    {
+                        throw new Exception("This needs to be better");

Review Comment:
   Perhaps, for now, just "Unexpected error received from external stream. 
Errno: {errno}"?  At the very least I think we should include the error number 
in the error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to