This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 6118e240 feat(python): Implement array from buffer for non-CPU arrays
(#550)
6118e240 is described below
commit 6118e24035e3aa00a278dff852734ff4d528a947
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Sep 18 14:00:08 2024 -0500
feat(python): Implement array from buffer for non-CPU arrays (#550)
Requires building with (e.g.) `export
NANOARROW_PYTHON_CUDA=/usr/local/cuda` and a `cupy` install:
```python
import nanoarrow as na
from nanoarrow import device
import cupy as cp
device.c_device_array(cp.array([1, 2, 3]))
#> <nanoarrow.device.CDeviceArray>
#> - device_type: CUDA <2>
#> - device_id: 0
#> - array: <nanoarrow.c_array.CArray int64>
#> - length: 3
#> - offset: 0
#> - null_count: 0
#> - buffers: (0, 133980798058496)
#> - dictionary: NULL
#> - children[0]:
# Also roundtrips
darray = device.c_device_array(cp.array([1, 2, 3]))
cp.from_dlpack(darray.array.view().buffer(1))
#> array([1, 2, 3])
```
---------
Co-authored-by: Dane Pitkin <[email protected]>
---
python/setup.py | 2 +-
python/src/nanoarrow/_array.pxd | 7 +-
python/src/nanoarrow/_array.pyx | 110 ++++++++---
python/src/nanoarrow/_buffer.pxd | 4 +-
python/src/nanoarrow/_buffer.pyx | 206 ++++++++++++++++++---
python/src/nanoarrow/_device.pxd | 9 +
python/src/nanoarrow/_device.pyx | 33 ++++
python/src/nanoarrow/_repr_utils.py | 3 +-
python/src/nanoarrow/_utils.pxd | 4 +
python/src/nanoarrow/_utils.pyx | 26 +++
python/src/nanoarrow/c_array.py | 31 +++-
python/src/nanoarrow/c_buffer.py | 18 +-
python/src/nanoarrow/device.py | 37 +++-
.../nanoarrow/_device.pxd => tests/conftest.py} | 14 +-
python/tests/test_c_array.py | 93 ++++++++++
python/tests/test_device.py | 21 +++
python/tests/test_dlpack.py | 28 ++-
17 files changed, 569 insertions(+), 77 deletions(-)
diff --git a/python/setup.py b/python/setup.py
index 45c37879..1b776247 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -85,7 +85,7 @@ if cuda_toolkit_root:
device_include_dirs.append(str(include_dir))
device_libraries.append("cuda")
- device_define_macros.append(("NANOARROW_DEVICE_WITH_CUDA", 1))
+ extra_define_macros.append(("NANOARROW_DEVICE_WITH_CUDA", 1))
# Library might be already in a system library directory such that no -L
flag
# is needed
diff --git a/python/src/nanoarrow/_array.pxd b/python/src/nanoarrow/_array.pxd
index 01712efc..e564d969 100644
--- a/python/src/nanoarrow/_array.pxd
+++ b/python/src/nanoarrow/_array.pxd
@@ -29,7 +29,7 @@ from nanoarrow_device_c cimport (
ArrowDeviceType
)
-from nanoarrow._device cimport Device
+from nanoarrow._device cimport CSharedSyncEvent
from nanoarrow._schema cimport CSchema
@@ -39,15 +39,16 @@ cdef class CArray:
cdef CSchema _schema
cdef ArrowDeviceType _device_type
cdef int _device_id
+ cdef void* _sync_event
- cdef _set_device(self, ArrowDeviceType device_type, int64_t device_id)
+ cdef _set_device(self, ArrowDeviceType device_type, int64_t device_id,
void* sync_event)
cdef class CArrayView:
cdef object _base
cdef object _array_base
cdef ArrowArrayView* _ptr
- cdef Device _device
+ cdef CSharedSyncEvent _event
cdef class CDeviceArray:
cdef object _base
diff --git a/python/src/nanoarrow/_array.pyx b/python/src/nanoarrow/_array.pyx
index e79aaa4c..3baaf16e 100644
--- a/python/src/nanoarrow/_array.pyx
+++ b/python/src/nanoarrow/_array.pyx
@@ -67,7 +67,6 @@ from nanoarrow_c cimport (
NANOARROW_OK,
)
-
from nanoarrow_device_c cimport (
ARROW_DEVICE_CPU,
ArrowDeviceType,
@@ -75,7 +74,7 @@ from nanoarrow_device_c cimport (
ArrowDeviceArrayInit,
)
-from nanoarrow._device cimport Device
+from nanoarrow._device cimport Device, CSharedSyncEvent
from nanoarrow._buffer cimport CBuffer, CBufferView
from nanoarrow._schema cimport CSchema, CLayout
@@ -107,7 +106,7 @@ cdef class CArrayView:
def __cinit__(self, object base, uintptr_t addr):
self._base = base
self._ptr = <ArrowArrayView*>addr
- self._device = DEVICE_CPU
+ self._event = CSharedSyncEvent(DEVICE_CPU)
def _set_array(self, CArray array, Device device=DEVICE_CPU):
cdef Error error = Error()
@@ -120,7 +119,8 @@ cdef class CArrayView:
error.raise_message_not_ok("ArrowArrayViewSetArray()", code)
self._array_base = array._base
- self._device = device
+ self._event = CSharedSyncEvent(device, <uintptr_t>array._sync_event)
+
return self
@property
@@ -160,7 +160,7 @@ cdef class CArrayView:
self._ptr.null_count = 0
elif validity_bits == NULL:
self._ptr.null_count = 0
- elif self._device is DEVICE_CPU:
+ elif self._event.device is DEVICE_CPU:
self._ptr.null_count = ArrowArrayViewComputeNullCount(self._ptr)
return self._ptr.null_count
@@ -178,7 +178,8 @@ cdef class CArrayView:
<uintptr_t>self._ptr.children[i]
)
- child._device = self._device
+ child._event = self._event
+
return child
@property
@@ -227,7 +228,7 @@ cdef class CArrayView:
buffer_view.size_bytes,
self._ptr.layout.buffer_data_type[i],
self._ptr.layout.element_size_bits[i],
- self._device
+ self._event
)
@property
@@ -239,11 +240,14 @@ cdef class CArrayView:
def dictionary(self):
if self._ptr.dictionary == NULL:
return None
- else:
- return CArrayView(
- self,
- <uintptr_t>self._ptr.dictionary
- )
+
+ cdef CArrayView dictionary = CArrayView(
+ self,
+ <uintptr_t>self._ptr.dictionary
+ )
+ dictionary._event = self._event
+
+ return dictionary
def __repr__(self):
return _repr_utils.array_view_repr(self)
@@ -288,11 +292,13 @@ cdef class CArray:
self._ptr = <ArrowArray*>addr
self._schema = schema
self._device_type = ARROW_DEVICE_CPU
- self._device_id = 0
+ self._device_id = -1
+ self._sync_event = NULL
- cdef _set_device(self, ArrowDeviceType device_type, int64_t device_id):
+ cdef _set_device(self, ArrowDeviceType device_type, int64_t device_id,
void* sync_event):
self._device_type = device_type
self._device_id = device_id
+ self._sync_event = sync_event
@staticmethod
def _import_from_c_capsule(schema_capsule, array_capsule) -> CArray:
@@ -350,7 +356,8 @@ cdef class CArray:
c_array_out.offset = c_array_out.offset + start
c_array_out.length = stop - start
cdef CArray out = CArray(base, <uintptr_t>c_array_out, self._schema)
- out._set_device(self._device_type, self._device_id)
+ out._set_device(self._device_type, self._device_id, self._sync_event)
+
return out
def __arrow_c_array__(self, requested_schema=None):
@@ -466,7 +473,7 @@ cdef class CArray:
<uintptr_t>self._ptr.children[i],
self._schema.child(i)
)
- out._set_device(self._device_type, self._device_id)
+ out._set_device(self._device_type, self._device_id, self._sync_event)
return out
@property
@@ -480,7 +487,7 @@ cdef class CArray:
cdef CArray out
if self._ptr.dictionary != NULL:
out = CArray(self, <uintptr_t>self._ptr.dictionary,
self._schema.dictionary)
- out._set_device(self._device_type, self._device_id)
+ out._set_device(self._device_type, self._device_id,
self._sync_event)
return out
else:
return None
@@ -497,22 +504,24 @@ cdef class CArrayBuilder:
"""
cdef CArray c_array
cdef ArrowArray* _ptr
+ cdef Device _device
cdef bint _can_validate
- def __cinit__(self, CArray array):
+ def __cinit__(self, CArray array, Device device=DEVICE_CPU):
self.c_array = array
self._ptr = array._ptr
- self._can_validate = True
+ self._device = device
+ self._can_validate = device is DEVICE_CPU
@staticmethod
- def allocate():
+ def allocate(Device device=DEVICE_CPU):
"""Create a CArrayBuilder
Allocates memory for an ArrowArray and populates it with nanoarrow's
ArrowArray private_data/release callback implementation. This should
usually be followed by :meth:`init_from_type` or
:meth:`init_from_schema`.
"""
- return CArrayBuilder(CArray.allocate(CSchema.allocate()))
+ return CArrayBuilder(CArray.allocate(CSchema.allocate()), device)
def is_empty(self) -> bool:
"""Check if any items have been appended to this builder"""
@@ -550,6 +559,9 @@ cdef class CArrayBuilder:
Calling this method is required to produce a valid array prior to
calling
:meth:`append_strings` or `append_bytes`.
"""
+ if self._device != DEVICE_CPU:
+ raise ValueError("Can't append to non-CPU array")
+
cdef int code = ArrowArrayStartAppending(self._ptr)
Error.raise_error_not_ok("ArrowArrayStartAppending()", code)
return self
@@ -617,7 +629,11 @@ cdef class CArrayBuilder:
return self
def resolve_null_count(self) -> CArrayBuilder:
- """Ensure the output null count is synchronized with existing
buffers"""
+ """Ensure the output null count is synchronized with existing buffers
+
+ Note that this will not attempt to access non-CPU buffers such that
+ :attr:`null_count` might still be -1 after calling this method.
+ """
self.c_array._assert_valid()
# This doesn't apply to unions. We currently don't have a schema view
@@ -636,6 +652,10 @@ cdef class CArrayBuilder:
self._ptr.null_count = 0
return self
+ # Don't attempt to access a non-cpu buffer
+ if self._device != DEVICE_CPU:
+ return self
+
# From _ArrowBytesForBits(), which is not included in nanoarrow_c.pxd
# because it's an internal inline function.
cdef int64_t bits = self._ptr.offset + self._ptr.length
@@ -669,6 +689,14 @@ cdef class CArrayBuilder:
if i < 0 or i > 3:
raise IndexError("i must be >= 0 and <= 3")
+ if buffer._device != self._device:
+ raise ValueError(
+ f"Builder device
({self._device.device_type}/{self._device.device_id})"
+ " and buffer device "
+ f"({buffer._device.device_type}/{buffer._device.device_id})"
+ " are not identical"
+ )
+
self.c_array._assert_valid()
if not move:
buffer = CBuffer.from_pybuffer(buffer)
@@ -694,6 +722,26 @@ cdef class CArrayBuilder:
if child._ptr.release != NULL:
ArrowArrayRelease(child._ptr)
+ if (
+ self._device.device_type_id != c_array.device_type_id
+ or self._device.device_id != c_array.device_id
+ ):
+ raise ValueError(
+ f"Builder device
({self._device.device_type}/{self._device.device_id})"
+ " and child device "
+ f"({c_array.device_type}/{c_array.device_id}) are not
identical"
+ )
+
+ # There is probably a way to avoid a full synchronize for each child
+ # (e.g., perhaps the ArrayBuilder could allocate a stream to use such
+ # that an event can be allocated on finish_device() and synchronization
+ # could be avoided entirely). Including this for now for safety.
+ cdef CSharedSyncEvent sync = CSharedSyncEvent(
+ self._device,
+ <uintptr_t>c_array._sync_event
+ )
+ sync.synchronize()
+
if not move:
c_array_shallow_copy(c_array._base, c_array._ptr, child._ptr)
else:
@@ -747,6 +795,20 @@ cdef class CArrayBuilder:
return out
+ def finish_device(self):
+ """Finish building this array and export to an ArrowDeviceArray
+
+ Calls :meth:`finish`, propagating device information into an
ArrowDeviceArray.
+ """
+ cdef CArray array = self.finish()
+
+ cdef ArrowDeviceArray* device_array_ptr
+ holder = alloc_c_device_array(&device_array_ptr)
+ cdef int code = ArrowDeviceArrayInit(self._device._ptr,
device_array_ptr, array._ptr, NULL)
+ Error.raise_error_not_ok("ArrowDeviceArrayInit", code)
+
+ return CDeviceArray(holder, <uintptr_t>device_array_ptr, array._schema)
+
cdef class CDeviceArray:
"""Low-level ArrowDeviceArray wrapper
@@ -792,10 +854,8 @@ cdef class CDeviceArray:
@property
def array(self) -> CArray:
- # TODO: We lose access to the sync_event here, so we probably need to
- # synchronize (or propagate it, or somehow prevent data access
downstream)
cdef CArray array = CArray(self, <uintptr_t>&self._ptr.array,
self._schema)
- array._set_device(self._ptr.device_type, self._ptr.device_id)
+ array._set_device(self._ptr.device_type, self._ptr.device_id,
self._ptr.sync_event)
return array
def view(self) -> CArrayView:
diff --git a/python/src/nanoarrow/_buffer.pxd b/python/src/nanoarrow/_buffer.pxd
index 01e966b0..9cd2dafa 100644
--- a/python/src/nanoarrow/_buffer.pxd
+++ b/python/src/nanoarrow/_buffer.pxd
@@ -25,14 +25,14 @@ from nanoarrow_c cimport (
ArrowType,
)
-from nanoarrow._device cimport Device
+from nanoarrow._device cimport Device, CSharedSyncEvent
cdef class CBufferView:
cdef object _base
cdef ArrowBufferView _ptr
cdef ArrowType _data_type
- cdef Device _device
+ cdef CSharedSyncEvent _event
cdef Py_ssize_t _element_size_bits
cdef Py_ssize_t _shape
cdef Py_ssize_t _strides
diff --git a/python/src/nanoarrow/_buffer.pyx b/python/src/nanoarrow/_buffer.pyx
index 8dad50cd..14b63b85 100644
--- a/python/src/nanoarrow/_buffer.pyx
+++ b/python/src/nanoarrow/_buffer.pyx
@@ -51,16 +51,20 @@ from nanoarrow_c cimport (
ArrowBitmapReserve,
ArrowBitmapAppend,
ArrowBitmapAppendUnsafe,
+ ArrowBuffer,
ArrowBufferMove,
)
from nanoarrow_device_c cimport (
ARROW_DEVICE_CPU,
+ ARROW_DEVICE_CUDA,
+ ArrowDevice,
)
from nanoarrow_dlpack cimport (
DLDataType,
DLDevice,
+ DLDeviceType,
DLManagedTensor,
DLTensor,
kDLCPU,
@@ -71,7 +75,7 @@ from nanoarrow_dlpack cimport (
from nanoarrow cimport _utils
from nanoarrow cimport _types
-from nanoarrow._device cimport Device
+from nanoarrow._device cimport CSharedSyncEvent, Device
from struct import unpack_from, iter_unpack, calcsize, Struct
@@ -121,8 +125,36 @@ cdef DLDataType view_to_dlpack_data_type(CBufferView view):
return dtype
-
-cdef object view_to_dlpack(CBufferView view):
+cdef int dlpack_data_type_to_arrow(DLDataType dtype):
+ if dtype.code == kDLInt:
+ if dtype.bits == 8:
+ return _types.INT8
+ elif dtype.bits == 16:
+ return _types.INT16
+ elif dtype.bits == 32:
+ return _types.INT32
+ elif dtype.bits == 64:
+ return _types.INT64
+ elif dtype.code == kDLUInt:
+ if dtype.bits == 8:
+ return _types.UINT8
+ elif dtype.bits == 16:
+ return _types.UINT16
+ elif dtype.bits == 32:
+ return _types.UINT32
+ elif dtype.bits == 64:
+ return _types.UINT64
+ elif dtype.code == kDLFloat:
+ if dtype.bits == 16:
+ return _types.HALF_FLOAT
+ elif dtype.bits == 32:
+ return _types.FLOAT
+ elif dtype.bits == 64:
+ return _types.DOUBLE
+
+ raise ValueError("Can't convert dlpack data type to Arrow type")
+
+cdef object view_to_dlpack(CBufferView view, stream=None):
# Define DLDevice and DLDataType struct and
# with that check for data type support first
cdef DLDevice device = view_to_dlpack_device(view)
@@ -146,6 +178,32 @@ cdef object view_to_dlpack(CBufferView view):
Py_INCREF(view)
dlm_tensor.deleter = view_dlpack_deleter
+ # stream has a DLPack + device specific interpretation
+
+ # nanoarrow_device needs a CUstream* (i.e., a CUstream_st**), but dlpack
+ # gives us a CUstream_st*.
+ cdef void* cuda_pstream
+
+ if view._event.device is DEVICE_CPU:
+ if stream is not None and stream != -1:
+ raise ValueError("dlpack stream must be None or -1 for the CPU
device")
+ elif view._event.device.device_type_id == ARROW_DEVICE_CUDA:
+ if stream == 0:
+ raise ValueError("dlpack stream value of 0 is not permitted for
CUDA")
+ elif stream == -1:
+ # Sentinel for "do not synchronize"
+ pass
+ elif stream in (1, 2):
+ # Technically we are mixing the per-thread and legacy default
streams here;
+ # however, the nanoarrow_device API currently has no mechanism to
expose
+ # a pointer to these streams specifically.
+ cuda_pstream = <void*>0
+ view._event.synchronize_stream(<uintptr_t>&cuda_pstream)
+ else:
+ # Otherwise, this is a CUstream** (i.e., CUstream_st*)
+ cuda_pstream = <void*><uintptr_t>stream
+ view._event.synchronize_stream(<uintptr_t>&cuda_pstream)
+
return PyCapsule_New(dlm_tensor, 'dltensor', pycapsule_dlpack_deleter)
@@ -163,15 +221,31 @@ cdef DLDevice view_to_dlpack_device(CBufferView view):
raise ValueError('DataType is not compatible with DLPack spec: ' +
view.data_type)
# Define DLDevice struct
- if view._device.device_type_id == ARROW_DEVICE_CPU:
+ cdef ArrowDevice* arrow_device = view._event.device._ptr
+ if arrow_device.device_type is ARROW_DEVICE_CPU:
+ # DLPack uses 0 for the CPU device id where Arrow uses -1
device.device_type = kDLCPU
device.device_id = 0
else:
- raise ValueError('Only CPU device is currently supported.')
+ # Otherwise, Arrow's device identifiers and types are intentionally
+ # identical to DLPack
+ device.device_type = <DLDeviceType>arrow_device.device_type
+ device.device_id = arrow_device.device_id
return device
+cdef bint dlpack_strides_are_contiguous(DLTensor* dl_tensor):
+ if dl_tensor.strides == NULL:
+ return True
+
+ if dl_tensor.ndim != 1:
+ raise NotImplementedError("Contiguous stride check not implemented for
ndim != 1")
+
+ # DLTensor strides are in elemements, not bytes
+ return dl_tensor.strides[0] == 1
+
+
cdef class CBufferView:
"""Wrapper for Array buffer content
@@ -183,12 +257,12 @@ cdef class CBufferView:
def __cinit__(self, object base, uintptr_t addr, int64_t size_bytes,
ArrowType data_type,
- Py_ssize_t element_size_bits, Device device):
+ Py_ssize_t element_size_bits, CSharedSyncEvent event):
self._base = base
self._ptr.data.data = <void*>addr
self._ptr.size_bytes = size_bytes
self._data_type = data_type
- self._device = device
+ self._event = event
self._format[0] = 0
self._element_size_bits = _types.to_format(
self._data_type,
@@ -208,7 +282,7 @@ cdef class CBufferView:
@property
def device(self):
- return self._device
+ return self._event.device
@property
def element_size_bits(self):
@@ -439,7 +513,7 @@ cdef class CBufferView:
Parameters
----------
stream : int, optional
- A Python integer representing a pointer to a stream. Currently not
supported.
+ A Python integer representing a pointer to a stream.
Stream is provided by the consumer to the producer to instruct the
producer
to ensure that operations can safely be performed on the array.
@@ -449,13 +523,7 @@ cdef class CBufferView:
A DLPack capsule for the array, pointing to a DLManagedTensor.
"""
# Note: parent offset not applied!
-
- if stream is None:
- return view_to_dlpack(self)
- else:
- raise NotImplementedError(
- "Only stream=None is supported."
- )
+ return view_to_dlpack(self, stream)
def __dlpack_device__(self):
@@ -469,9 +537,8 @@ cdef class CBufferView:
CPU = 1, see python/src/nanoarrow/dpack_abi.h) and index of the
device which is 0 by default for CPU.
"""
- return (view_to_dlpack_device(self).device_type,
- view_to_dlpack_device(self).device_id)
-
+ cdef DLDevice dlpack_device = view_to_dlpack_device(self)
+ return dlpack_device.device_type, dlpack_device.device_id
# These are special methods, which can't be cdef and we can't
# call them from elsewhere. We implement the logic for the buffer
@@ -483,7 +550,7 @@ cdef class CBufferView:
self._do_releasebuffer(buffer)
cdef _do_getbuffer(self, Py_buffer *buffer, int flags):
- if self._device is not DEVICE_CPU:
+ if self.device is not DEVICE_CPU:
raise RuntimeError("CBufferView is not a CPU buffer")
if flags & PyBUF_WRITABLE:
@@ -519,6 +586,9 @@ cdef class CBuffer:
Like the CBufferView, the CBuffer represents readable buffer content;
however,
unlike the CBufferView, the CBuffer always represents a valid ArrowBuffer
C object.
+ Whereas the CBufferView is primarily concerned with accessing the contents
of a
+ buffer, the CBuffer is primarily concerned with managing ownership of an
external
+ buffer such that it exported as an Arrow array.
"""
def __cinit__(self):
@@ -531,7 +601,11 @@ cdef class CBuffer:
self._format[0] = 66
self._format[1] = 0
self._get_buffer_count = 0
- self._view = CBufferView(None, 0, 0, _types.BINARY, 0, self._device)
+ self._view = CBufferView(
+ None, 0,
+ 0, _types.BINARY, 0,
+ CSharedSyncEvent(self._device)
+ )
cdef _assert_valid(self):
if self._ptr == NULL:
@@ -549,19 +623,47 @@ cdef class CBuffer:
self._view = CBufferView(
self._base, <uintptr_t>self._ptr.data,
self._ptr.size_bytes, self._data_type, self._element_size_bits,
- self._device
+ CSharedSyncEvent(self._device)
)
snprintf(self._view._format, sizeof(self._view._format), "%s",
self._format)
+ def view(self):
+ """Export this buffer as a CBufferView
+
+ Returns a :class:`CBufferView` of this buffer. After calling this
+ method, the original CBuffer will be invalidated and cannot be used.
+ In general, the view of the buffer should be used to consume a buffer
+ (whereas the CBuffer is primarily used to wrap an existing object in
+ a way that it can be used to build a :class:`CArray`).
+ """
+ self._assert_valid()
+ self._assert_buffer_count_zero()
+ cdef ArrowBuffer* new_ptr
+ self._view._base = _utils.alloc_c_buffer(&new_ptr)
+ ArrowBufferMove(self._ptr, new_ptr)
+ self._ptr = NULL
+ return self._view
+
@staticmethod
def empty():
+ """Create an empty CBuffer"""
cdef CBuffer out = CBuffer()
out._base = _utils.alloc_c_buffer(&out._ptr)
return out
@staticmethod
- def from_pybuffer(obj):
+ def from_pybuffer(obj) -> CBuffer:
+ """Create a CBuffer using the Python buffer protocol
+
+ Wraps a buffer using the Python buffer protocol as a CBuffer that can
be
+ used to create an array.
+
+ Parameters
+ ----------
+ obj : buffer-like
+ The object on which to invoke the Python buffer protocol
+ """
cdef CBuffer out = CBuffer()
out._base = _utils.alloc_c_buffer(&out._ptr)
out._set_format(_utils.c_buffer_set_pybuffer(obj, &out._ptr))
@@ -569,6 +671,55 @@ cdef class CBuffer:
out._populate_view()
return out
+ @staticmethod
+ def from_dlpack(obj, stream=None) -> CBuffer:
+ """Create a CBuffer using the DLPack protocol
+
+ Wraps a tensor from an external library as a CBuffer that can be used
+ to create an array.
+
+ Parameters
+ ----------
+ obj : object with a ``__dlpack__`` attribute
+ The object on which to invoke the DLPack protocol
+ stream : int, optional
+ The stream on which the tensor represented by obj should be made
+ safe for use. This value is passed to the object's ``__dlpack__``
+ method; however, the CBuffer does not keep any record of this
(i.e.,
+ the caller is responsible for creating a sync event after creating
one
+ or more buffers in this way).
+ """
+ capsule = obj.__dlpack__(stream=stream)
+ cdef DLManagedTensor* dlm_tensor =
<DLManagedTensor*>PyCapsule_GetPointer(
+ capsule, "dltensor"
+ )
+ cdef DLTensor* dl_tensor = &dlm_tensor.dl_tensor
+
+ if not dlpack_strides_are_contiguous(dl_tensor):
+ raise ValueError("Non-contiguous dlpack strides not supported")
+
+ cdef Device device = Device.resolve(
+ dl_tensor.device.device_type,
+ dl_tensor.device.device_id
+ )
+ cdef int arrow_type = dlpack_data_type_to_arrow(dl_tensor.dtype)
+ cdef uint8_t* data_ptr = <uint8_t*>dl_tensor.data +
dl_tensor.byte_offset
+
+ cdef int64_t size_bytes = 1
+ cdef int64_t element_size_bytes = dl_tensor.dtype.bits // 8
+ for i in range(dl_tensor.ndim):
+ size_bytes *= dl_tensor.shape[i] * element_size_bytes
+
+ cdef CBuffer out = CBuffer()
+
+ out._base = _utils.alloc_c_buffer(&out._ptr)
+ _utils.c_buffer_set_pyobject(capsule, data_ptr, size_bytes, &out._ptr)
+ out._set_data_type(arrow_type)
+ out._device = device
+ out._populate_view()
+
+ return out
+
def _set_format(self, str format):
self._assert_buffer_count_zero()
@@ -609,14 +760,17 @@ cdef class CBuffer:
@property
def data_type(self):
+ self._assert_valid()
return ArrowTypeString(self._data_type).decode("UTF-8")
@property
def data_type_id(self):
+ self._assert_valid()
return self._data_type
@property
def element_size_bits(self):
+ self._assert_valid()
return self._element_size_bits
@property
@@ -626,8 +780,14 @@ cdef class CBuffer:
@property
def format(self):
+ self._assert_valid()
return self._format.decode("UTF-8")
+ @property
+ def device(self):
+ self._assert_valid()
+ return self._view.device
+
def __len__(self):
self._assert_valid()
return len(self._view)
diff --git a/python/src/nanoarrow/_device.pxd b/python/src/nanoarrow/_device.pxd
index e42618a8..1eae9dc8 100644
--- a/python/src/nanoarrow/_device.pxd
+++ b/python/src/nanoarrow/_device.pxd
@@ -17,8 +17,17 @@
# cython: language_level = 3
+from libc.stdint cimport uintptr_t
+
from nanoarrow_device_c cimport ArrowDevice
cdef class Device:
cdef object _base
cdef ArrowDevice* _ptr
+
+cdef class CSharedSyncEvent:
+ cdef Device device
+ cdef void* sync_event
+
+ cdef synchronize(self)
+ cdef synchronize_stream(self, uintptr_t stream)
diff --git a/python/src/nanoarrow/_device.pyx b/python/src/nanoarrow/_device.pyx
index d1f507db..2ef383fc 100644
--- a/python/src/nanoarrow/_device.pyx
+++ b/python/src/nanoarrow/_device.pyx
@@ -39,6 +39,8 @@ from nanoarrow_device_c cimport (
ArrowDeviceResolve
)
+from nanoarrow._utils cimport Error
+
from enum import Enum
from nanoarrow import _repr_utils
@@ -84,6 +86,13 @@ cdef class Device:
self._base = base,
self._ptr = <ArrowDevice*>addr
+ def __eq__(self, other) -> bool:
+ return (
+ isinstance(other, Device) and
+ other.device_type == self.device_type and
+ other.device_id == self.device_id
+ )
+
def __repr__(self):
return _repr_utils.device_repr(self)
@@ -114,3 +123,27 @@ cdef class Device:
# Cache the CPU device
# The CPU device is statically allocated (so base is None)
DEVICE_CPU = Device(None, <uintptr_t>ArrowDeviceCpu())
+
+
+cdef class CSharedSyncEvent:
+
+ def __cinit__(self, Device device, uintptr_t sync_event=0):
+ self.device = device
+ self.sync_event = <void*>sync_event
+
+ cdef synchronize(self):
+ if self.sync_event == NULL:
+ return
+
+ cdef Error error = Error()
+ cdef ArrowDevice* c_device = self.device._ptr
+ cdef int code = c_device.synchronize_event(c_device, self.sync_event,
NULL, &error.c_error)
+ error.raise_message_not_ok("ArrowDevice::synchronize_event", code)
+
+ self.sync_event = NULL
+
+ cdef synchronize_stream(self, uintptr_t stream):
+ cdef Error error = Error()
+ cdef ArrowDevice* c_device = self.device._ptr
+ cdef int code = c_device.synchronize_event(c_device, self.sync_event,
<void*>stream, &error.c_error)
+ error.raise_message_not_ok("ArrowDevice::synchronize_event with
stream", code)
diff --git a/python/src/nanoarrow/_repr_utils.py
b/python/src/nanoarrow/_repr_utils.py
index df53a206..c695d807 100644
--- a/python/src/nanoarrow/_repr_utils.py
+++ b/python/src/nanoarrow/_repr_utils.py
@@ -185,7 +185,8 @@ def buffer_view_repr(buffer_view, max_char_width=80):
+ buffer_view_preview_cpu(buffer_view, max_char_width -
len(prefix) - 2)
)
else:
- return prefix
+ dev_info =
f"<{buffer_view.device.device_type}/{buffer_view.device.device_id}>"
+ return prefix + dev_info
def buffer_view_preview_cpu(buffer_view, max_char_width):
diff --git a/python/src/nanoarrow/_utils.pxd b/python/src/nanoarrow/_utils.pxd
index dc2b33a4..30bb9a53 100644
--- a/python/src/nanoarrow/_utils.pxd
+++ b/python/src/nanoarrow/_utils.pxd
@@ -17,6 +17,8 @@
# cython: language_level = 3
+from libc.stdint cimport uint8_t, int64_t
+
from nanoarrow_c cimport (
ArrowSchema,
ArrowArray,
@@ -46,6 +48,8 @@ cdef void c_device_array_shallow_copy(object base, const
ArrowDeviceArray* src,
cdef object c_buffer_set_pybuffer(object obj, ArrowBuffer** c_buffer)
+cdef void c_buffer_set_pyobject(object base, uint8_t* data, int64_t
size_bytes, ArrowBuffer** c_buffer)
+
cdef class Error:
cdef ArrowError c_error
diff --git a/python/src/nanoarrow/_utils.pyx b/python/src/nanoarrow/_utils.pyx
index f27e4238..b261fb29 100644
--- a/python/src/nanoarrow/_utils.pyx
+++ b/python/src/nanoarrow/_utils.pyx
@@ -493,3 +493,29 @@ cdef object c_buffer_set_pybuffer(object obj,
ArrowBuffer** c_buffer):
# Return the calculated components
return format
+
+
+cdef void c_deallocate_pyobject(ArrowBufferAllocator* allocator, uint8_t* ptr,
int64_t size) noexcept with gil:
+ """ArrowBufferDeallocatorCallback for an ArrowBuffer wrapping a
Py_Buffer"""
+ Py_DECREF(<object>allocator.private_data)
+
+
+cdef ArrowBufferAllocator c_pyobject_deallocator(object obj):
+ """ArrowBufferAllocator implementation wrapping a PyObject"""
+ Py_INCREF(obj)
+ return ArrowBufferDeallocator(
+ <ArrowBufferDeallocatorCallback>&c_deallocate_pyobject,
+ <void*>obj
+ )
+
+cdef void c_buffer_set_pyobject(object base, uint8_t* data, int64_t
size_bytes, ArrowBuffer** c_buffer):
+ """Manage a Python object reference as an ArrowBuffer
+
+ Calls ``Py_INCREF()`` on base and populates ``c_buffer`` with an
``ArrowBuffer``
+ whose allocator has been set such that when ``ArrowBufferReset()`` is
invoked,
+ the reference to base will be released with ``Py_DECREF()``.
+ """
+ c_buffer[0].data = data
+ c_buffer[0].size_bytes = size_bytes
+ c_buffer[0].capacity_bytes = 0
+ c_buffer[0].allocator = c_pyobject_deallocator(base)
diff --git a/python/src/nanoarrow/c_array.py b/python/src/nanoarrow/c_array.py
index 062b8033..d0250456 100644
--- a/python/src/nanoarrow/c_array.py
+++ b/python/src/nanoarrow/c_array.py
@@ -15,10 +15,11 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Any, Iterable, Literal, Tuple
+from typing import Any, Iterable, Literal, Tuple, Union
-from nanoarrow._array import CArray, CArrayBuilder, CArrayView
+from nanoarrow._array import CArray, CArrayBuilder, CArrayView, CDeviceArray
from nanoarrow._buffer import CBuffer, CBufferBuilder, NoneAwareWrapperIterator
+from nanoarrow._device import DEVICE_CPU, Device
from nanoarrow._schema import CSchema, CSchemaBuilder
from nanoarrow._utils import obj_is_buffer, obj_is_capsule
from nanoarrow.c_buffer import c_buffer
@@ -201,6 +202,7 @@ def c_array_from_buffers(
children: Iterable[Any] = (),
validation_level: Literal[None, "full", "default", "minimal", "none"] =
None,
move: bool = False,
+ device: Union[Device, None] = None,
) -> CArray:
"""Create an ArrowArray wrapper from components
@@ -241,6 +243,10 @@ def c_array_from_buffers(
move : bool, optional
Use ``True`` to move ownership of any input buffers or children to the
output array.
+ device : Device, optional
+ An explicit device to use when constructing this array. If specified,
+ this function will construct a :class:`CDeviceArray`; if unspecified,
+ this function will construct a :class:`CArray` on the CPU device.
Examples
--------
@@ -258,8 +264,14 @@ def c_array_from_buffers(
- dictionary: NULL
- children[0]:
"""
+ if device is None:
+ explicit_device = False
+ device = DEVICE_CPU
+ else:
+ explicit_device = True
+
schema = c_schema(schema)
- builder = CArrayBuilder.allocate()
+ builder = CArrayBuilder.allocate(device)
# Ensures that the output array->n_buffers is set and that the correct
number
# of children have been initialized.
@@ -282,7 +294,10 @@ def c_array_from_buffers(
for child_src in children:
# If we're setting a CArray from something else, we can avoid an extra
# level of Python wrapping by using move=True
- move = move or not isinstance(child_src, CArray)
+ move = move or not isinstance(child_src, (CArray, CDeviceArray))
+ if move and isinstance(child_src, CDeviceArray):
+ child_src = child_src.array
+
builder.set_child(n_children, c_array(child_src), move=move)
n_children += 1
@@ -297,8 +312,12 @@ def c_array_from_buffers(
# Calculates the null count if -1 (and if applicable)
builder.resolve_null_count()
- # Validate + finish
- return builder.finish(validation_level=validation_level)
+ # Validate + finish. If device is specified (even CPU), always
+ # return a device array.
+ if not explicit_device:
+ return builder.finish(validation_level=validation_level)
+ else:
+ return builder.finish_device()
class ArrayBuilder:
diff --git a/python/src/nanoarrow/c_buffer.py b/python/src/nanoarrow/c_buffer.py
index f1c1fd4b..6eb16fe0 100644
--- a/python/src/nanoarrow/c_buffer.py
+++ b/python/src/nanoarrow/c_buffer.py
@@ -43,9 +43,10 @@ def c_buffer(obj, schema=None) -> CBuffer:
Parameters
----------
- obj : buffer-like or iterable
- A Python object that supports the Python buffer protocol. This includes
- bytes, memoryview, bytearray, bulit-in types as well as numpy arrays.
+ obj : buffer-like, tensor, or iterable
+ A Python object that supports the Python buffer or DLPack protocols.
+ This includes bytes, memoryview, bytearray, bulit-in types as well
+ as numpy arrays.
schema : schema-like, optional
The data type of the desired buffer as sanitized by
:func:`c_schema`. Only values that make sense as buffer types are
@@ -71,6 +72,13 @@ def c_buffer(obj, schema=None) -> CBuffer:
)
return CBuffer.from_pybuffer(obj)
+ if _obj_is_tensor(obj):
+ if schema is not None:
+ raise NotImplementedError(
+ "c_buffer() with schema for DLPack is not implemented"
+ )
+ return CBuffer.from_dlpack(obj)
+
if _obj_is_iterable(obj):
buffer, _ = _c_buffer_from_iterable(obj, schema)
return buffer
@@ -122,3 +130,7 @@ def _c_buffer_from_iterable(obj, schema=None) -> CBuffer:
def _obj_is_iterable(obj):
return hasattr(obj, "__iter__")
+
+
+def _obj_is_tensor(obj):
+ return hasattr(obj, "__dlpack__")
diff --git a/python/src/nanoarrow/device.py b/python/src/nanoarrow/device.py
index ee62cf5c..032f7e87 100644
--- a/python/src/nanoarrow/device.py
+++ b/python/src/nanoarrow/device.py
@@ -17,7 +17,9 @@
from nanoarrow._array import CDeviceArray
from nanoarrow._device import DEVICE_CPU, Device, DeviceType # noqa: F401
-from nanoarrow.c_array import c_array
+from nanoarrow._schema import CSchemaBuilder
+from nanoarrow.c_array import c_array, c_array_from_buffers
+from nanoarrow.c_buffer import c_buffer
from nanoarrow.c_schema import c_schema
@@ -25,11 +27,28 @@ def cpu():
return DEVICE_CPU
-def resolve(device_type, device_id):
- return Device.resolve(device_type, device_id)
+def resolve(device_type: DeviceType, device_id: int):
+ return Device.resolve(DeviceType(device_type).value, device_id)
def c_device_array(obj, schema=None):
+ """ArrowDeviceArray wrapper
+
+ This class provides a user-facing interface to access the fields of an
+ ArrowDeviceArray.
+
+ These objects are created using :func:`c_device_array`, which accepts any
+ device array or array-like object according to the Arrow device PyCapsule
+ interface, the DLPack protocol, or any object accepted by :func:`c_array`.
+
+ Parameters
+ ----------
+ obj : device array-like
+ An object supporting the Arrow device PyCapsule interface, the DLPack
+ protocol, or any object accepted by :func:`c_array`.
+ schema : schema-like or None
+ A schema-like object as sanitized by :func:`c_schema` or None.
+ """
if schema is not None:
schema = c_schema(schema)
@@ -43,6 +62,18 @@ def c_device_array(obj, schema=None):
)
return CDeviceArray._import_from_c_capsule(schema_capsule,
device_array_capsule)
+ if hasattr(obj, "__dlpack__"):
+ buffer = c_buffer(obj, schema=schema)
+ schema =
CSchemaBuilder.allocate().set_type(buffer.data_type_id).finish()
+ return c_array_from_buffers(
+ schema,
+ len(buffer),
+ [None, buffer],
+ null_count=0,
+ move=True,
+ device=buffer.device,
+ )
+
# Attempt to create a CPU array and wrap it
cpu_array = c_array(obj, schema=schema)
return CDeviceArray._init_from_array(cpu(), cpu_array._addr(),
cpu_array.schema)
diff --git a/python/src/nanoarrow/_device.pxd b/python/tests/conftest.py
similarity index 79%
copy from python/src/nanoarrow/_device.pxd
copy to python/tests/conftest.py
index e42618a8..b2707884 100644
--- a/python/src/nanoarrow/_device.pxd
+++ b/python/tests/conftest.py
@@ -15,10 +15,14 @@
# specific language governing permissions and limitations
# under the License.
-# cython: language_level = 3
+import pytest
-from nanoarrow_device_c cimport ArrowDevice
-cdef class Device:
- cdef object _base
- cdef ArrowDevice* _ptr
[email protected]
+def cuda_device():
+ from nanoarrow.device import DeviceType, resolve
+
+ try:
+ return resolve(DeviceType.CUDA, 0)
+ except ValueError:
+ return None
diff --git a/python/tests/test_c_array.py b/python/tests/test_c_array.py
index a28bacd5..e4d98f17 100644
--- a/python/tests/test_c_array.py
+++ b/python/tests/test_c_array.py
@@ -24,6 +24,7 @@ from nanoarrow._utils import NanoarrowException
from nanoarrow.c_schema import c_schema_view
import nanoarrow as na
+from nanoarrow import device
def test_c_array_from_c_array():
@@ -520,6 +521,17 @@ def test_c_array_from_buffers_validation():
assert c_array.length == 2
+def test_c_array_from_buffers_device():
+ c_device_array = na.c_array_from_buffers(
+ na.uint8(), 5, [None, b"12345"], device=device.cpu()
+ )
+ assert isinstance(c_device_array, device.CDeviceArray)
+
+ c_array = na.c_array(c_device_array)
+ assert c_array.length == 5
+ assert bytes(c_array.view().buffer(1)) == b"12345"
+
+
def test_c_array_timestamp_seconds():
d1 = int(round(datetime(1970, 1, 1, tzinfo=timezone.utc).timestamp()))
d2 = int(round(datetime(1985, 12, 31, tzinfo=timezone.utc).timestamp()))
@@ -610,3 +622,84 @@ def test_c_array_duration():
d2_duration_in_ms,
d3_duration_in_ms,
]
+
+
+def test_device_array_errors(cuda_device):
+ if not cuda_device:
+ pytest.skip("CUDA device not available")
+
+ # Check that we can't create a CUDA array from CPU buffers
+ with pytest.raises(ValueError, match="are not identical"):
+ na.c_array_from_buffers(
+ na.int64(),
+ 3,
+ [None, na.c_buffer([1, 2, 3], na.int64())],
+ device=cuda_device,
+ )
+
+ # Check that we can't create a CUDA array from CPU children
+ with pytest.raises(ValueError, match="are not identical"):
+ na.c_array_from_buffers(
+ na.struct([na.int64()]),
+ length=0,
+ buffers=[None],
+ children=[na.c_array([], na.int64())],
+ device=cuda_device,
+ )
+
+
+def test_array_from_dlpack_cuda(cuda_device):
+ from nanoarrow.device import CDeviceArray, DeviceType
+
+ cp = pytest.importorskip("cupy")
+ if not cuda_device:
+ pytest.skip("CUDA device not available")
+
+ gpu_validity = cp.array([255], cp.uint8)
+ gpu_array = cp.array([1, 2, 3], cp.int64)
+
+ c_array = na.c_array_from_buffers(
+ na.int64(),
+ 3,
+ [gpu_validity, gpu_array],
+ move=True,
+ device=cuda_device,
+ )
+ assert isinstance(c_array, CDeviceArray)
+ assert c_array.device_type == DeviceType.CUDA
+ assert c_array.device_id == 0
+
+ c_array_view = c_array.view()
+ assert c_array_view.storage_type == "int64"
+ assert c_array_view.buffer(0).device == cuda_device
+ assert c_array_view.buffer(1).device == cuda_device
+ assert len(c_array_view) == 3
+
+ # Make sure we don't attempt accessing a GPU buffer to calculate the null
count
+ assert c_array_view.null_count == -1
+
+ # Check that buffers made it all the way through
+ cp.testing.assert_array_equal(cp.from_dlpack(c_array_view.buffer(1)),
gpu_array)
+
+ # Also check a nested array
+ c_array_struct = na.c_array_from_buffers(
+ na.struct([na.int64()]),
+ 3,
+ buffers=[None],
+ children=[c_array],
+ move=True,
+ device=cuda_device,
+ )
+ assert c_array_struct.device_type == DeviceType.CUDA
+ assert c_array_struct.device_id == 0
+
+ c_array_view_struct = c_array_struct.view()
+ assert c_array_view_struct.storage_type == "struct"
+ assert c_array_view_struct.buffer(0).device == cuda_device
+
+ c_array_view_child = c_array_view_struct.child(0)
+ assert c_array_view_child.buffer(0).device == cuda_device
+ assert c_array_view_child.buffer(1).device == cuda_device
+ cp.testing.assert_array_equal(
+ cp.from_dlpack(c_array_view_child.buffer(1)), gpu_array
+ )
diff --git a/python/tests/test_device.py b/python/tests/test_device.py
index 09d897a5..a92cfc85 100644
--- a/python/tests/test_device.py
+++ b/python/tests/test_device.py
@@ -60,6 +60,27 @@ def test_c_device_array():
assert array.buffers == darray.array.buffers
+def test_c_device_array_from_dlpack(cuda_device):
+ from nanoarrow.device import DeviceType
+
+ cp = pytest.importorskip("cupy")
+ if not cuda_device:
+ pytest.skip("CUDA device not available")
+
+ assert cuda_device.device_type == DeviceType.CUDA
+ assert cuda_device.device_id == 0
+
+ gpu_array = cp.array([1, 2, 3])
+ darray = device.c_device_array(gpu_array)
+ assert isinstance(darray, device.CDeviceArray)
+ assert darray.device_type == DeviceType.CUDA
+ assert len(darray.array) == 3
+ assert darray.array.null_count == 0
+
+ gpu_buffer_view = darray.array.view().buffer(1)
+ cp.testing.assert_array_equal(cp.from_dlpack(gpu_buffer_view), gpu_array)
+
+
def test_c_device_array_protocol():
# Wrapper to prevent c_device_array() from returning early when it detects
the
# input is already a CDeviceArray
diff --git a/python/tests/test_dlpack.py b/python/tests/test_dlpack.py
index 03e379f8..62357693 100644
--- a/python/tests/test_dlpack.py
+++ b/python/tests/test_dlpack.py
@@ -16,6 +16,7 @@
# under the License.
import pytest
+from nanoarrow._buffer import CBuffer
from nanoarrow._utils import obj_is_capsule
import nanoarrow as na
@@ -24,13 +25,20 @@ np = pytest.importorskip("numpy")
def check_dlpack_export(view, expected_arr):
- DLTensor = view.__dlpack__()
- assert obj_is_capsule(DLTensor, "dltensor") is True
+ # Check device spec
+ assert view.__dlpack_device__() == (1, 0)
+
+ # Check capsule export
+ capsule = view.__dlpack__()
+ assert obj_is_capsule(capsule, "dltensor") is True
+ # Check roundtrip through numpy
result = np.from_dlpack(view)
np.testing.assert_array_equal(result, expected_arr, strict=True)
- assert view.__dlpack_device__() == (1, 0)
+ # Check roundtrip through CBuffer
+ buffer = CBuffer.from_dlpack(view)
+ np.testing.assert_array_equal(np.array(buffer), expected_arr, strict=True)
@pytest.mark.parametrize(
@@ -78,5 +86,15 @@ def test_dlpack_not_supported():
):
view.__dlpack_device__()
- with pytest.raises(NotImplementedError, match="Only stream=None is
supported."):
- view.__dlpack__(stream=3)
+
+def test_dlpack_cuda(cuda_device):
+ cp = pytest.importorskip("cupy")
+ if not cuda_device:
+ pytest.skip("CUDA device not available")
+
+ gpu_array = cp.array([1, 2, 3])
+ gpu_buffer = na.c_buffer(gpu_array)
+ assert gpu_buffer.device == cuda_device
+
+ gpu_array_roundtrip = cp.from_dlpack(gpu_buffer.view())
+ cp.testing.assert_array_equal(gpu_array_roundtrip, gpu_array)