This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 41e3bff ARROW-3653: [C++][Python] Support data copying between
different GPU devices
41e3bff is described below
commit 41e3bff035c8f44ff3cb7794b68ebb4d08cb641c
Author: Pearu Peterson <[email protected]>
AuthorDate: Tue Feb 12 15:30:22 2019 +0100
ARROW-3653: [C++][Python] Support data copying between different GPU devices
This PR introduces new C++ methods
`CudaContext::CopyDeviceToAnotherDevice`
`CudaBuffer::CopyFromAnotherDevice`
that are used to implement the Python support for copying CUDA buffer data
between different devices.
Author: Pearu Peterson <[email protected]>
Closes #3617 from pearu/arrow-3653 and squashes the following commits:
4de6c973 <Pearu Peterson> Skip tests on copying data between buffers of
different devices on a single GPU system.
b10afc34 <Pearu Peterson> Support copying data in buffers of different
devices.
---
cpp/src/arrow/gpu/cuda_context.cc | 16 ++++++++++++++++
cpp/src/arrow/gpu/cuda_context.h | 2 ++
cpp/src/arrow/gpu/cuda_memory.cc | 8 ++++++++
cpp/src/arrow/gpu/cuda_memory.h | 11 ++++++++++-
python/pyarrow/_cuda.pyx | 23 +++++++++++++----------
python/pyarrow/includes/libarrow_cuda.pxd | 3 +++
python/pyarrow/tests/test_cuda.py | 14 ++++++++++++--
7 files changed, 64 insertions(+), 13 deletions(-)
diff --git a/cpp/src/arrow/gpu/cuda_context.cc
b/cpp/src/arrow/gpu/cuda_context.cc
index 2f3f1bd..93caa52 100644
--- a/cpp/src/arrow/gpu/cuda_context.cc
+++ b/cpp/src/arrow/gpu/cuda_context.cc
@@ -112,6 +112,16 @@ class CudaContext::CudaContextImpl {
return Status::OK();
}
+ Status CopyDeviceToAnotherDevice(const std::shared_ptr<CudaContext>&
dst_ctx, void* dst,
+ const void* src, int64_t nbytes) {
+ ContextSaver set_temporary(context_);
+ CU_RETURN_NOT_OK(cuMemcpyPeer(reinterpret_cast<CUdeviceptr>(dst),
+
reinterpret_cast<CUcontext>(dst_ctx->handle()),
+ reinterpret_cast<const CUdeviceptr>(src),
context_,
+ static_cast<size_t>(nbytes)));
+ return Status::OK();
+ }
+
Status Synchronize(void) {
ContextSaver set_temporary(context_);
CU_RETURN_NOT_OK(cuCtxSynchronize());
@@ -301,6 +311,12 @@ Status CudaContext::CopyDeviceToDevice(void* dst, const
void* src, int64_t nbyte
return impl_->CopyDeviceToDevice(dst, src, nbytes);
}
+Status CudaContext::CopyDeviceToAnotherDevice(const
std::shared_ptr<CudaContext>& dst_ctx,
+ void* dst, const void* src,
+ int64_t nbytes) {
+ return impl_->CopyDeviceToAnotherDevice(dst_ctx, dst, src, nbytes);
+}
+
Status CudaContext::Synchronize(void) { return impl_->Synchronize(); }
Status CudaContext::Close() { return impl_->Close(); }
diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h
index 938a815..682cbd8 100644
--- a/cpp/src/arrow/gpu/cuda_context.h
+++ b/cpp/src/arrow/gpu/cuda_context.h
@@ -141,6 +141,8 @@ class ARROW_EXPORT CudaContext : public
std::enable_shared_from_this<CudaContext
Status CopyHostToDevice(void* dst, const void* src, int64_t nbytes);
Status CopyDeviceToHost(void* dst, const void* src, int64_t nbytes);
Status CopyDeviceToDevice(void* dst, const void* src, int64_t nbytes);
+ Status CopyDeviceToAnotherDevice(const std::shared_ptr<CudaContext>&
dst_ctx, void* dst,
+ const void* src, int64_t nbytes);
Status Free(void* device_ptr, int64_t nbytes);
class CudaContextImpl;
diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc
index a0da580..e91fef2 100644
--- a/cpp/src/arrow/gpu/cuda_memory.cc
+++ b/cpp/src/arrow/gpu/cuda_memory.cc
@@ -163,6 +163,14 @@ Status CudaBuffer::CopyFromDevice(const int64_t position,
const void* data,
return context_->CopyDeviceToDevice(mutable_data_ + position, data, nbytes);
}
+Status CudaBuffer::CopyFromAnotherDevice(const std::shared_ptr<CudaContext>&
src_ctx,
+ const int64_t position, const void*
data,
+ int64_t nbytes) {
+ DCHECK_LE(nbytes, size_ - position) << "Copy would overflow buffer";
+ return src_ctx->CopyDeviceToAnotherDevice(context_, mutable_data_ +
position, data,
+ nbytes);
+}
+
Status CudaBuffer::ExportForIpc(std::shared_ptr<CudaIpcMemHandle>* handle) {
if (is_ipc_) {
return Status::Invalid("Buffer has already been exported for IPC");
diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h
index 193deed..6b9f04c 100644
--- a/cpp/src/arrow/gpu/cuda_memory.h
+++ b/cpp/src/arrow/gpu/cuda_memory.h
@@ -64,7 +64,7 @@ class ARROW_EXPORT CudaBuffer : public Buffer {
Status CopyToHost(const int64_t position, const int64_t nbytes, void* out)
const;
/// \brief Copy memory to device at position
- /// \param[in] position start position to copy bytes
+ /// \param[in] position start position to copy bytes to
/// \param[in] data the host data to copy
/// \param[in] nbytes number of bytes to copy
/// \return Status
@@ -80,6 +80,15 @@ class ARROW_EXPORT CudaBuffer : public Buffer {
/// memories have been allocated within the same context.
Status CopyFromDevice(const int64_t position, const void* data, int64_t
nbytes);
+ /// \brief Copy memory from another device to device at position
+ /// \param[in] src_ctx context of the source device memory
+ /// \param[in] position start position inside buffer to copy bytes to
+ /// \param[in] data start address of the another device memory area to copy
from
+ /// \param[in] nbytes number of bytes to copy
+ /// \return Status
+ Status CopyFromAnotherDevice(const std::shared_ptr<CudaContext>& src_ctx,
+ const int64_t position, const void* data,
int64_t nbytes);
+
/// \brief Expose this device buffer as IPC memory which can be used in
other processes
/// \param[out] handle the exported IPC handle
/// \return Status
diff --git a/python/pyarrow/_cuda.pyx b/python/pyarrow/_cuda.pyx
index eac3dae..fa84fc6 100644
--- a/python/pyarrow/_cuda.pyx
+++ b/python/pyarrow/_cuda.pyx
@@ -561,15 +561,12 @@ cdef class CudaBuffer(Buffer):
def copy_from_device(self, buf, int64_t position=0, int64_t nbytes=-1):
"""Copy data from device to device.
- The destination device buffer must be pre-allocated within the
- same context as source device buffer.
-
Parameters
----------
buf : CudaBuffer
Specify source device buffer.
position : int
- Specify the starting position of the copy in devive buffer.
+ Specify the starting position of the copy in device buffer.
Default: 0.
nbytes : int
Specify the number of bytes to copy. Default: -1 (all from
@@ -581,9 +578,6 @@ cdef class CudaBuffer(Buffer):
Number of bytes copied.
"""
- if self.context.handle != buf.context.handle:
- raise ValueError('device source and destination buffers must be '
- 'within the same context')
if position < 0 or position > self.size:
raise ValueError('position argument is out-of-range')
cdef int64_t nbytes_
@@ -605,9 +599,18 @@ cdef class CudaBuffer(Buffer):
cdef shared_ptr[CCudaBuffer] buf_ = pyarrow_unwrap_cudabuffer(buf)
cdef int64_t position_ = position
- with nogil:
- check_status(self.cuda_buffer.get().
- CopyFromDevice(position_, buf_.get().data(), nbytes_))
+ cdef shared_ptr[CCudaContext] src_ctx_ = pyarrow_unwrap_cudacontext(
+ buf.context)
+ if self.context.handle != buf.context.handle:
+ with nogil:
+ check_status(self.cuda_buffer.get().
+ CopyFromAnotherDevice(src_ctx_, position_,
+ buf_.get().data(), nbytes_))
+ else:
+ with nogil:
+ check_status(self.cuda_buffer.get().
+ CopyFromDevice(position_, buf_.get().data(),
+ nbytes_))
return nbytes_
def export_for_ipc(self):
diff --git a/python/pyarrow/includes/libarrow_cuda.pxd
b/python/pyarrow/includes/libarrow_cuda.pxd
index ef89d9c..ce09115 100644
--- a/python/pyarrow/includes/libarrow_cuda.pxd
+++ b/python/pyarrow/includes/libarrow_cuda.pxd
@@ -71,6 +71,9 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace
"arrow::cuda" nogil:
int64_t nbytes)
CStatus CopyFromDevice(const int64_t position, const void* data,
int64_t nbytes)
+ CStatus CopyFromAnotherDevice(const shared_ptr[CCudaContext]& src_ctx,
+ const int64_t position, const void* data,
+ int64_t nbytes)
CStatus ExportForIpc(shared_ptr[CCudaIpcMemHandle]* handle)
shared_ptr[CCudaContext] context() const
diff --git a/python/pyarrow/tests/test_cuda.py
b/python/pyarrow/tests/test_cuda.py
index 4633df1..7c56e33 100644
--- a/python/pyarrow/tests/test_cuda.py
+++ b/python/pyarrow/tests/test_cuda.py
@@ -40,10 +40,12 @@ cuda_ipc = pytest.mark.skipif(
reason='CUDA IPC not supported in platform `%s`' % (platform))
global_context = None # for flake8
+global_context1 = None # for flake8
def setup_module(module):
module.global_context = cuda.Context(0)
+ module.global_context1 = cuda.Context(cuda.Context.get_num_devices() - 1)
def teardown_module(module):
@@ -53,6 +55,7 @@ def teardown_module(module):
def test_Context():
assert cuda.Context.get_num_devices() > 0
assert global_context.device_number == 0
+ assert global_context1.device_number == cuda.Context.get_num_devices() - 1
with pytest.raises(ValueError,
match=("device_number argument must "
@@ -398,11 +401,18 @@ def test_copy_to_host(size):
dbuf.copy_to_host(buf=buf, position=position, nbytes=nbytes)
[email protected]("dest_ctx", ['same', 'another'])
@pytest.mark.parametrize("size", [0, 1, 1000])
-def test_copy_from_device(size):
+def test_copy_from_device(dest_ctx, size):
arr, buf = make_random_buffer(size=size, target='device')
lst = arr.tolist()
- dbuf = buf.context.new_buffer(size)
+ if dest_ctx == 'another':
+ dest_ctx = global_context1
+ if buf.context.device_number == dest_ctx.device_number:
+ pytest.skip("not a multi-GPU system")
+ else:
+ dest_ctx = buf.context
+ dbuf = dest_ctx.new_buffer(size)
def put(*args, **kwargs):
dbuf.copy_from_device(buf, *args, **kwargs)