This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new ffee537d88 GH-42222: [Python] Add bindings for CopyTo on RecordBatch
and Array classes (#42223)
ffee537d88 is described below
commit ffee537d88ab6d26614e2a1e85d4d18152695020
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Wed Aug 21 14:18:45 2024 +0200
GH-42222: [Python] Add bindings for CopyTo on RecordBatch and Array classes
(#42223)
### Rationale for this change
We have added bindings for the Device and MemoryManager classes
(https://github.com/apache/arrow/issues/41126), and as a next step we can
expose the functionality to copy a full Array or RecordBatch to a specific
memory manager.
### What changes are included in this PR?
This adds a `copy_to` method on pyarrow Array and RecordBatch.
### Are these changes tested?
Yes
* GitHub Issue: #42222
Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
---
python/pyarrow/array.pxi | 36 ++++++++++++++++
python/pyarrow/device.pxi | 6 +++
python/pyarrow/includes/libarrow.pxd | 4 ++
python/pyarrow/lib.pxd | 4 ++
python/pyarrow/table.pxi | 35 +++++++++++++++
python/pyarrow/tests/test_cuda.py | 82 ++++++++++++++----------------------
python/pyarrow/tests/test_device.py | 26 ++++++++++++
7 files changed, 143 insertions(+), 50 deletions(-)
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index 4c3eb93232..77d6c9c06d 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -1702,6 +1702,42 @@ cdef class Array(_PandasConvertible):
_append_array_buffers(self.sp_array.get().data().get(), res)
return res
+ def copy_to(self, destination):
+ """
+ Construct a copy of the array with all buffers on destination
+ device.
+
+ This method recursively copies the array's buffers and those of its
+ children onto the destination MemoryManager device and returns the
+ new Array.
+
+ Parameters
+ ----------
+ destination : pyarrow.MemoryManager or pyarrow.Device
+ The destination device to copy the array to.
+
+ Returns
+ -------
+ Array
+ """
+ cdef:
+ shared_ptr[CArray] c_array
+ shared_ptr[CMemoryManager] c_memory_manager
+
+ if isinstance(destination, Device):
+ c_memory_manager =
(<Device>destination).unwrap().get().default_memory_manager()
+ elif isinstance(destination, MemoryManager):
+ c_memory_manager = (<MemoryManager>destination).unwrap()
+ else:
+ raise TypeError(
+ "Argument 'destination' has incorrect type (expected a "
+ f"pyarrow Device or MemoryManager, got {type(destination)})"
+ )
+
+ with nogil:
+ c_array = GetResultValue(self.ap.CopyTo(c_memory_manager))
+ return pyarrow_wrap_array(c_array)
+
def _export_to_c(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowArray struct, given its pointer.
diff --git a/python/pyarrow/device.pxi b/python/pyarrow/device.pxi
index 6e60347520..26256de620 100644
--- a/python/pyarrow/device.pxi
+++ b/python/pyarrow/device.pxi
@@ -64,6 +64,9 @@ cdef class Device(_Weakrefable):
self.init(device)
return self
+ cdef inline shared_ptr[CDevice] unwrap(self) nogil:
+ return self.device
+
def __eq__(self, other):
if not isinstance(other, Device):
return False
@@ -130,6 +133,9 @@ cdef class MemoryManager(_Weakrefable):
self.init(mm)
return self
+ cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil:
+ return self.memory_manager
+
def __repr__(self):
return "<pyarrow.MemoryManager device: {}>".format(
frombytes(self.memory_manager.get().device().get().ToString())
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index a54a1db292..6f510cfc0c 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -234,7 +234,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CStatus Validate() const
CStatus ValidateFull() const
CResult[shared_ptr[CArray]] View(const shared_ptr[CDataType]& type)
+
CDeviceAllocationType device_type()
+ CResult[shared_ptr[CArray]] CopyTo(const shared_ptr[CMemoryManager]&
to) const
shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data)
CResult[shared_ptr[CArray]] MakeArrayOfNull(
@@ -1027,6 +1029,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CRecordBatch] Slice(int64_t offset)
shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)
+ CResult[shared_ptr[CRecordBatch]] CopyTo(const
shared_ptr[CMemoryManager]& to) const
+
CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, c_bool
row_major,
CMemoryPool* pool) const
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index e3625c1815..a7c3b496a0 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -542,6 +542,8 @@ cdef class Device(_Weakrefable):
@staticmethod
cdef wrap(const shared_ptr[CDevice]& device)
+ cdef inline shared_ptr[CDevice] unwrap(self) nogil
+
cdef class MemoryManager(_Weakrefable):
cdef:
@@ -552,6 +554,8 @@ cdef class MemoryManager(_Weakrefable):
@staticmethod
cdef wrap(const shared_ptr[CMemoryManager]& mm)
+ cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil
+
cdef class Buffer(_Weakrefable):
cdef:
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 8f7c44e55d..6d34c71c9d 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -3569,6 +3569,41 @@ cdef class RecordBatch(_Tabular):
row_major, pool))
return pyarrow_wrap_tensor(c_tensor)
+ def copy_to(self, destination):
+ """
+ Copy the entire RecordBatch to destination device.
+
+ This copies each column of the record batch to create
+ a new record batch where all underlying buffers for the columns have
+ been copied to the destination MemoryManager.
+
+ Parameters
+ ----------
+ destination : pyarrow.MemoryManager or pyarrow.Device
+ The destination device to copy the array to.
+
+ Returns
+ -------
+ RecordBatch
+ """
+ cdef:
+ shared_ptr[CRecordBatch] c_batch
+ shared_ptr[CMemoryManager] c_memory_manager
+
+ if isinstance(destination, Device):
+ c_memory_manager =
(<Device>destination).unwrap().get().default_memory_manager()
+ elif isinstance(destination, MemoryManager):
+ c_memory_manager = (<MemoryManager>destination).unwrap()
+ else:
+ raise TypeError(
+ "Argument 'destination' has incorrect type (expected a "
+ f"pyarrow Device or MemoryManager, got {type(destination)})"
+ )
+
+ with nogil:
+ c_batch = GetResultValue(self.batch.CopyTo(c_memory_manager))
+ return pyarrow_wrap_batch(c_batch)
+
def _export_to_c(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowArray struct, given its pointer.
diff --git a/python/pyarrow/tests/test_cuda.py
b/python/pyarrow/tests/test_cuda.py
index 36b97a6206..d55be651b1 100644
--- a/python/pyarrow/tests/test_cuda.py
+++ b/python/pyarrow/tests/test_cuda.py
@@ -827,21 +827,29 @@ def test_IPC(size):
assert p.exitcode == 0
-def _arr_copy_to_host(carr):
- # TODO replace below with copy to device when exposed in python
- buffers = []
- for cbuf in carr.buffers():
- if cbuf is None:
- buffers.append(None)
- else:
- buf = global_context.foreign_buffer(
- cbuf.address, cbuf.size, cbuf
- ).copy_to_host()
- buffers.append(buf)
-
- child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:])
- new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child])
- return new
+def test_copy_to():
+ _, buf = make_random_buffer(size=10, target='device')
+ mm_cuda = buf.memory_manager
+
+ for dest in [mm_cuda, mm_cuda.device]:
+ arr = pa.array([0, 1, 2])
+ arr_cuda = arr.copy_to(dest)
+ assert not arr_cuda.buffers()[1].is_cpu
+ assert arr_cuda.buffers()[1].device_type ==
pa.DeviceAllocationType.CUDA
+ assert arr_cuda.buffers()[1].device == mm_cuda.device
+
+ arr_roundtrip = arr_cuda.copy_to(pa.default_cpu_memory_manager())
+ assert arr_roundtrip.equals(arr)
+
+ batch = pa.record_batch({"col": arr})
+ batch_cuda = batch.copy_to(dest)
+ buf_cuda = batch_cuda["col"].buffers()[1]
+ assert not buf_cuda.is_cpu
+ assert buf_cuda.device_type == pa.DeviceAllocationType.CUDA
+ assert buf_cuda.device == mm_cuda.device
+
+ batch_roundtrip = batch_cuda.copy_to(pa.default_cpu_memory_manager())
+ assert batch_roundtrip.equals(batch)
def test_device_interface_array():
@@ -856,19 +864,10 @@ def test_device_interface_array():
typ = pa.list_(pa.int32())
arr = pa.array([[1], [2, 42]], type=typ)
- # TODO replace below with copy to device when exposed in python
- cbuffers = []
- for buf in arr.buffers():
- if buf is None:
- cbuffers.append(None)
- else:
- cbuf = global_context.new_buffer(buf.size)
- cbuf.copy_from_host(buf, position=0, nbytes=buf.size)
- cbuffers.append(cbuf)
-
- carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
- pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
- ])
+ # copy to device
+ _, buf = make_random_buffer(size=10, target='device')
+ mm_cuda = buf.memory_manager
+ carr = arr.copy_to(mm_cuda)
# Type is known up front
carr._export_to_c_device(ptr_array)
@@ -882,7 +881,7 @@ def test_device_interface_array():
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, typ)
assert carr_new.type == pa.list_(pa.int32())
- arr_new = _arr_copy_to_host(carr_new)
+ arr_new = carr_new.copy_to(pa.default_cpu_memory_manager())
assert arr_new.equals(arr)
del carr_new
@@ -891,15 +890,13 @@ def test_device_interface_array():
pa.Array._import_from_c_device(ptr_array, typ)
# Schema is exported and imported at the same time
- carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
- pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
- ])
+ carr = arr.copy_to(mm_cuda)
carr._export_to_c_device(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema)
assert carr_new.type == pa.list_(pa.int32())
- arr_new = _arr_copy_to_host(carr_new)
+ arr_new = carr_new.copy_to(pa.default_cpu_memory_manager())
assert arr_new.equals(arr)
del carr_new
@@ -908,21 +905,6 @@ def test_device_interface_array():
pa.Array._import_from_c_device(ptr_array, ptr_schema)
-def _batch_copy_to_host(cbatch):
- # TODO replace below with copy to device when exposed in python
- arrs = []
- for col in cbatch.columns:
- buffers = [
- global_context.foreign_buffer(buf.address, buf.size,
buf).copy_to_host()
- if buf is not None else None
- for buf in col.buffers()
- ]
- new = pa.Array.from_buffers(col.type, len(col), buffers)
- arrs.append(new)
-
- return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema)
-
-
def test_device_interface_batch_array():
cffi = pytest.importorskip("pyarrow.cffi")
ffi = cffi.ffi
@@ -949,7 +931,7 @@ def test_device_interface_batch_array():
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema)
assert cbatch_new.schema == schema
- batch_new = _batch_copy_to_host(cbatch_new)
+ batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager())
assert batch_new.equals(batch)
del cbatch_new
@@ -964,7 +946,7 @@ def test_device_interface_batch_array():
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
assert cbatch_new.schema == schema
- batch_new = _batch_copy_to_host(cbatch_new)
+ batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager())
assert batch_new.equals(batch)
del cbatch_new
diff --git a/python/pyarrow/tests/test_device.py
b/python/pyarrow/tests/test_device.py
index 6bdb015be1..dc1a51e6d0 100644
--- a/python/pyarrow/tests/test_device.py
+++ b/python/pyarrow/tests/test_device.py
@@ -17,6 +17,8 @@
import pyarrow as pa
+import pytest
+
def test_device_memory_manager():
mm = pa.default_cpu_memory_manager()
@@ -41,3 +43,27 @@ def test_buffer_device():
assert buf.device.is_cpu
assert buf.device == pa.default_cpu_memory_manager().device
assert buf.memory_manager.is_cpu
+
+
+def test_copy_to():
+ mm = pa.default_cpu_memory_manager()
+
+ arr = pa.array([0, 1, 2])
+ batch = pa.record_batch({"col": arr})
+
+ for dest in [mm, mm.device]:
+ arr_copied = arr.copy_to(dest)
+ assert arr_copied.equals(arr)
+ assert arr_copied.buffers()[1].device == mm.device
+ assert arr_copied.buffers()[1].address != arr.buffers()[1].address
+
+ batch_copied = batch.copy_to(dest)
+ assert batch_copied.equals(batch)
+ assert batch_copied["col"].buffers()[1].device == mm.device
+ assert batch_copied["col"].buffers()[1].address !=
arr.buffers()[1].address
+
+ with pytest.raises(TypeError, match="Argument 'destination' has incorrect
type"):
+ arr.copy_to(mm.device.device_type)
+
+ with pytest.raises(TypeError, match="Argument 'destination' has incorrect
type"):
+ batch.copy_to(mm.device.device_type)