This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new ffee537d88 GH-42222: [Python] Add bindings for CopyTo on RecordBatch 
and Array classes (#42223)
ffee537d88 is described below

commit ffee537d88ab6d26614e2a1e85d4d18152695020
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Wed Aug 21 14:18:45 2024 +0200

    GH-42222: [Python] Add bindings for CopyTo on RecordBatch and Array classes 
(#42223)
    
    ### Rationale for this change
    
    We have added bindings for the Device and MemoryManager classes 
(https://github.com/apache/arrow/issues/41126), and as a next step we can 
expose the functionality to copy a full Array or RecordBatch to a specific 
memory manager.
    
    ### What changes are included in this PR?
    
    This adds a `copy_to` method on pyarrow Array and RecordBatch.
    
    ### Are these changes tested?
    
    Yes
    
    * GitHub Issue: #42222
    
    Authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Joris Van den Bossche <[email protected]>
---
 python/pyarrow/array.pxi             | 36 ++++++++++++++++
 python/pyarrow/device.pxi            |  6 +++
 python/pyarrow/includes/libarrow.pxd |  4 ++
 python/pyarrow/lib.pxd               |  4 ++
 python/pyarrow/table.pxi             | 35 +++++++++++++++
 python/pyarrow/tests/test_cuda.py    | 82 ++++++++++++++----------------------
 python/pyarrow/tests/test_device.py  | 26 ++++++++++++
 7 files changed, 143 insertions(+), 50 deletions(-)

diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index 4c3eb93232..77d6c9c06d 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -1702,6 +1702,42 @@ cdef class Array(_PandasConvertible):
         _append_array_buffers(self.sp_array.get().data().get(), res)
         return res
 
+    def copy_to(self, destination):
+        """
+        Construct a copy of the array with all buffers on destination
+        device.
+
+        This method recursively copies the array's buffers and those of its
+        children onto the destination MemoryManager device and returns the
+        new Array.
+
+        Parameters
+        ----------
+        destination : pyarrow.MemoryManager or pyarrow.Device
+            The destination device to copy the array to.
+
+        Returns
+        -------
+        Array
+        """
+        cdef:
+            shared_ptr[CArray] c_array
+            shared_ptr[CMemoryManager] c_memory_manager
+
+        if isinstance(destination, Device):
+            c_memory_manager = 
(<Device>destination).unwrap().get().default_memory_manager()
+        elif isinstance(destination, MemoryManager):
+            c_memory_manager = (<MemoryManager>destination).unwrap()
+        else:
+            raise TypeError(
+                "Argument 'destination' has incorrect type (expected a "
+                f"pyarrow Device or MemoryManager, got {type(destination)})"
+            )
+
+        with nogil:
+            c_array = GetResultValue(self.ap.CopyTo(c_memory_manager))
+        return pyarrow_wrap_array(c_array)
+
     def _export_to_c(self, out_ptr, out_schema_ptr=0):
         """
         Export to a C ArrowArray struct, given its pointer.
diff --git a/python/pyarrow/device.pxi b/python/pyarrow/device.pxi
index 6e60347520..26256de620 100644
--- a/python/pyarrow/device.pxi
+++ b/python/pyarrow/device.pxi
@@ -64,6 +64,9 @@ cdef class Device(_Weakrefable):
         self.init(device)
         return self
 
+    cdef inline shared_ptr[CDevice] unwrap(self) nogil:
+        return self.device
+
     def __eq__(self, other):
         if not isinstance(other, Device):
             return False
@@ -130,6 +133,9 @@ cdef class MemoryManager(_Weakrefable):
         self.init(mm)
         return self
 
+    cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil:
+        return self.memory_manager
+
     def __repr__(self):
         return "<pyarrow.MemoryManager device: {}>".format(
             frombytes(self.memory_manager.get().device().get().ToString())
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index a54a1db292..6f510cfc0c 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -234,7 +234,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         CStatus Validate() const
         CStatus ValidateFull() const
         CResult[shared_ptr[CArray]] View(const shared_ptr[CDataType]& type)
+
         CDeviceAllocationType device_type()
+        CResult[shared_ptr[CArray]] CopyTo(const shared_ptr[CMemoryManager]& 
to) const
 
     shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data)
     CResult[shared_ptr[CArray]] MakeArrayOfNull(
@@ -1027,6 +1029,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         shared_ptr[CRecordBatch] Slice(int64_t offset)
         shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)
 
+        CResult[shared_ptr[CRecordBatch]] CopyTo(const 
shared_ptr[CMemoryManager]& to) const
+
         CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, c_bool 
row_major,
                                               CMemoryPool* pool) const
 
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index e3625c1815..a7c3b496a0 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -542,6 +542,8 @@ cdef class Device(_Weakrefable):
     @staticmethod
     cdef wrap(const shared_ptr[CDevice]& device)
 
+    cdef inline shared_ptr[CDevice] unwrap(self) nogil
+
 
 cdef class MemoryManager(_Weakrefable):
     cdef:
@@ -552,6 +554,8 @@ cdef class MemoryManager(_Weakrefable):
     @staticmethod
     cdef wrap(const shared_ptr[CMemoryManager]& mm)
 
+    cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil
+
 
 cdef class Buffer(_Weakrefable):
     cdef:
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 8f7c44e55d..6d34c71c9d 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -3569,6 +3569,41 @@ cdef class RecordBatch(_Tabular):
                                                                              
row_major, pool))
         return pyarrow_wrap_tensor(c_tensor)
 
+    def copy_to(self, destination):
+        """
+        Copy the entire RecordBatch to destination device.
+
+        This copies each column of the record batch to create
+        a new record batch where all underlying buffers for the columns have
+        been copied to the destination MemoryManager.
+
+        Parameters
+        ----------
+        destination : pyarrow.MemoryManager or pyarrow.Device
+            The destination device to copy the array to.
+
+        Returns
+        -------
+        RecordBatch
+        """
+        cdef:
+            shared_ptr[CRecordBatch] c_batch
+            shared_ptr[CMemoryManager] c_memory_manager
+
+        if isinstance(destination, Device):
+            c_memory_manager = 
(<Device>destination).unwrap().get().default_memory_manager()
+        elif isinstance(destination, MemoryManager):
+            c_memory_manager = (<MemoryManager>destination).unwrap()
+        else:
+            raise TypeError(
+                "Argument 'destination' has incorrect type (expected a "
+                f"pyarrow Device or MemoryManager, got {type(destination)})"
+            )
+
+        with nogil:
+            c_batch = GetResultValue(self.batch.CopyTo(c_memory_manager))
+        return pyarrow_wrap_batch(c_batch)
+
     def _export_to_c(self, out_ptr, out_schema_ptr=0):
         """
         Export to a C ArrowArray struct, given its pointer.
diff --git a/python/pyarrow/tests/test_cuda.py 
b/python/pyarrow/tests/test_cuda.py
index 36b97a6206..d55be651b1 100644
--- a/python/pyarrow/tests/test_cuda.py
+++ b/python/pyarrow/tests/test_cuda.py
@@ -827,21 +827,29 @@ def test_IPC(size):
     assert p.exitcode == 0
 
 
-def _arr_copy_to_host(carr):
-    # TODO replace below with copy to device when exposed in python
-    buffers = []
-    for cbuf in carr.buffers():
-        if cbuf is None:
-            buffers.append(None)
-        else:
-            buf = global_context.foreign_buffer(
-                cbuf.address, cbuf.size, cbuf
-            ).copy_to_host()
-            buffers.append(buf)
-
-    child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:])
-    new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child])
-    return new
+def test_copy_to():
+    _, buf = make_random_buffer(size=10, target='device')
+    mm_cuda = buf.memory_manager
+
+    for dest in [mm_cuda, mm_cuda.device]:
+        arr = pa.array([0, 1, 2])
+        arr_cuda = arr.copy_to(dest)
+        assert not arr_cuda.buffers()[1].is_cpu
+        assert arr_cuda.buffers()[1].device_type == 
pa.DeviceAllocationType.CUDA
+        assert arr_cuda.buffers()[1].device == mm_cuda.device
+
+        arr_roundtrip = arr_cuda.copy_to(pa.default_cpu_memory_manager())
+        assert arr_roundtrip.equals(arr)
+
+        batch = pa.record_batch({"col": arr})
+        batch_cuda = batch.copy_to(dest)
+        buf_cuda = batch_cuda["col"].buffers()[1]
+        assert not buf_cuda.is_cpu
+        assert buf_cuda.device_type == pa.DeviceAllocationType.CUDA
+        assert buf_cuda.device == mm_cuda.device
+
+        batch_roundtrip = batch_cuda.copy_to(pa.default_cpu_memory_manager())
+        assert batch_roundtrip.equals(batch)
 
 
 def test_device_interface_array():
@@ -856,19 +864,10 @@ def test_device_interface_array():
     typ = pa.list_(pa.int32())
     arr = pa.array([[1], [2, 42]], type=typ)
 
-    # TODO replace below with copy to device when exposed in python
-    cbuffers = []
-    for buf in arr.buffers():
-        if buf is None:
-            cbuffers.append(None)
-        else:
-            cbuf = global_context.new_buffer(buf.size)
-            cbuf.copy_from_host(buf, position=0, nbytes=buf.size)
-            cbuffers.append(cbuf)
-
-    carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
-        pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
-    ])
+    # copy to device
+    _, buf = make_random_buffer(size=10, target='device')
+    mm_cuda = buf.memory_manager
+    carr = arr.copy_to(mm_cuda)
 
     # Type is known up front
     carr._export_to_c_device(ptr_array)
@@ -882,7 +881,7 @@ def test_device_interface_array():
     del carr
     carr_new = pa.Array._import_from_c_device(ptr_array, typ)
     assert carr_new.type == pa.list_(pa.int32())
-    arr_new = _arr_copy_to_host(carr_new)
+    arr_new = carr_new.copy_to(pa.default_cpu_memory_manager())
     assert arr_new.equals(arr)
 
     del carr_new
@@ -891,15 +890,13 @@ def test_device_interface_array():
         pa.Array._import_from_c_device(ptr_array, typ)
 
     # Schema is exported and imported at the same time
-    carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
-        pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
-    ])
+    carr = arr.copy_to(mm_cuda)
     carr._export_to_c_device(ptr_array, ptr_schema)
     # Delete and recreate C++ objects from exported pointers
     del carr
     carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema)
     assert carr_new.type == pa.list_(pa.int32())
-    arr_new = _arr_copy_to_host(carr_new)
+    arr_new = carr_new.copy_to(pa.default_cpu_memory_manager())
     assert arr_new.equals(arr)
 
     del carr_new
@@ -908,21 +905,6 @@ def test_device_interface_array():
         pa.Array._import_from_c_device(ptr_array, ptr_schema)
 
 
-def _batch_copy_to_host(cbatch):
-    # TODO replace below with copy to device when exposed in python
-    arrs = []
-    for col in cbatch.columns:
-        buffers = [
-            global_context.foreign_buffer(buf.address, buf.size, 
buf).copy_to_host()
-            if buf is not None else None
-            for buf in col.buffers()
-        ]
-        new = pa.Array.from_buffers(col.type, len(col), buffers)
-        arrs.append(new)
-
-    return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema)
-
-
 def test_device_interface_batch_array():
     cffi = pytest.importorskip("pyarrow.cffi")
     ffi = cffi.ffi
@@ -949,7 +931,7 @@ def test_device_interface_batch_array():
     del cbatch
     cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema)
     assert cbatch_new.schema == schema
-    batch_new = _batch_copy_to_host(cbatch_new)
+    batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager())
     assert batch_new.equals(batch)
 
     del cbatch_new
@@ -964,7 +946,7 @@ def test_device_interface_batch_array():
     del cbatch
     cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
     assert cbatch_new.schema == schema
-    batch_new = _batch_copy_to_host(cbatch_new)
+    batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager())
     assert batch_new.equals(batch)
 
     del cbatch_new
diff --git a/python/pyarrow/tests/test_device.py 
b/python/pyarrow/tests/test_device.py
index 6bdb015be1..dc1a51e6d0 100644
--- a/python/pyarrow/tests/test_device.py
+++ b/python/pyarrow/tests/test_device.py
@@ -17,6 +17,8 @@
 
 import pyarrow as pa
 
+import pytest
+
 
 def test_device_memory_manager():
     mm = pa.default_cpu_memory_manager()
@@ -41,3 +43,27 @@ def test_buffer_device():
     assert buf.device.is_cpu
     assert buf.device == pa.default_cpu_memory_manager().device
     assert buf.memory_manager.is_cpu
+
+
+def test_copy_to():
+    mm = pa.default_cpu_memory_manager()
+
+    arr = pa.array([0, 1, 2])
+    batch = pa.record_batch({"col": arr})
+
+    for dest in [mm, mm.device]:
+        arr_copied = arr.copy_to(dest)
+        assert arr_copied.equals(arr)
+        assert arr_copied.buffers()[1].device == mm.device
+        assert arr_copied.buffers()[1].address != arr.buffers()[1].address
+
+        batch_copied = batch.copy_to(dest)
+        assert batch_copied.equals(batch)
+        assert batch_copied["col"].buffers()[1].device == mm.device
+        assert batch_copied["col"].buffers()[1].address != 
arr.buffers()[1].address
+
+    with pytest.raises(TypeError, match="Argument 'destination' has incorrect 
type"):
+        arr.copy_to(mm.device.device_type)
+
+    with pytest.raises(TypeError, match="Argument 'destination' has incorrect 
type"):
+        batch.copy_to(mm.device.device_type)

Reply via email to