This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 99190d0 ARROW-1424: [Python] Add CUDA support to pyarrow
99190d0 is described below
commit 99190d02b919b11d94df156b398e1492159df027
Author: Pearu Peterson <[email protected]>
AuthorDate: Thu Sep 13 19:07:46 2018 -0400
ARROW-1424: [Python] Add CUDA support to pyarrow
This PR implements CUDA support to pyarrow. In order to use it, the Arrow
C++ library must be built with the following cmake options:
```
-DARROW_PYTHON=on -DARROW_GPU=ON -DARROW_IPC=ON
```
To enable CUDA support in pyarrow, it must be built with `--with-cuda`
option, e.g.
```
python setup.py build_ext --with-cuda <other options>
```
or the environment must define
```
PYARROW_WITH_CUDA=1
```
This CUDA support implementation is rather complete: all Arrow C++ GPU
related functions and classes are exposed to Python, the new methods and
functions are documented, the test coverage is close to 100%.
However, there are some issues that need to be tackled and questions to be
answered (hence the WIP attribute):
1. Is the naming convention of the new methods appropriate? Are there any
changes needed in the new API?
2. The IPC test (see `test_IPC` in `python/pyarrow/tests/test_gpu.py`)
fails when calling `open_ipc_buffer`: `cuIpcOpenMemHandle fails with code 201`
. Currently, I don't know why. Any hint or help on this is much appreciated.
[FIXED: using `multiprocessing.Process` in `spawn` mode]
3. Anything else?
Author: Pearu Peterson <[email protected]>
Closes #2536 from pearu/pearu-cuda-pyarrow and squashes the following
commits:
7a018465a <Pearu Peterson> Minor clean up 2.
c86e1651a <Pearu Peterson> Minor clean up.
5e171e8c2 <Pearu Peterson> Revised buffer_from_data.
7555c657b <Pearu Peterson> Raise BufferError in CudaBuffer.__getbuffer__.
Fix pytest.raises usages.
e1bcb0886 <Pearu Peterson> Apply feedback from PR. 2.
38ddfffba <Pearu Peterson> Apply feedback from PR. WIP.
4ece8092a <Pearu Peterson> Remove redudant -DPYARROW_BUILD_CUDA=on
66d704ea5 <Pearu Peterson> Silence flake8 on Arrow CUDA api module.
88961fa35 <Pearu Peterson> cmake: moved Arrow CUDA detection to
FindArrowCuda.
2126eba85 <Pearu Peterson> Fixes for flake8
ff7faa2d1 <Pearu Peterson> Removed DeviceManager, use
'ctx=Context(<device_number>)'. Introduced Context.get_num_devices and
Context.device_number.
f3c41dbae <Pearu Peterson> Added cdefs to CMemoryPool varibales
106b07197 <Pearu Peterson> Removed cuda prefix everywhere except in
CudaBuffer.
dff5bd495 <Pearu Peterson> More space removal. Fix CudaBufferWriter and
cuda_read_record_batch docs.
ce1d3bb50 <Pearu Peterson> Remove _freed attribute from CudaHostBuffer
1defcb4c8 <Pearu Peterson> Remove spaces around * and &
94989a7cd <Pearu Peterson> Rename lib_gpu to _cuda, ARROW_GPU to CUDA,
--with-arrow-gpu to --with-cuda. Remove --without-arrow-gpu option. To test
availability of cuda support, try import pyarrow.cuda.
cb89ee302 <Pearu Peterson> Remove usage of FreeHost to avoid double-freeing
issue.
61659c4fe <Pearu Peterson> Introduce pyarrow.cuda module as the CUDA UI.
8faf1ee51 <Pearu Peterson> Add missing import of pandas_compat.
a5447874d <Pearu Peterson> Fix formatting for flake8.
7053df860 <Pearu Peterson> Improve detecting availability of gpu support
(2nd try).
3913e2904 <Pearu Peterson> Improve detecting availability of gpu support.
9e365bd15 <Pearu Peterson> Merge branch 'pearu-cuda-pyarrow' of
github.com:Quansight/arrow into pearu-cuda-pyarrow
d237b3493 <Pearu Peterson> Implement GPU support in pyarrow
584d94ba0 <Pearu Peterson> Flake it.
c2799e645 <Pearu Peterson> Impl IPC tests.
7dc6d8833 <Pearu Peterson> Impl CudaBufferWriter.writeat,
CudaBufferReader.read_buffer. Complete unittests for CudaBufferWriter/Reader
e2b14df8b <Pearu Peterson> Impl CudaBuffer device_buffer and slice methods.
Impl tests for CudaBuffer and CudaHostBuffer.
36396c689 <Pearu Peterson> Test memory management of CudaBuffer and
CudaHostBuffer
5e2c1ba1e <Pearu Peterson> Unittest for allocate/free host buffer.
102f277e0 <Pearu Peterson> Document all methods and functions.
9b8cb1b9f <Pearu Peterson> Complete copy_from/to_host implementations and
tests.
9eebf19aa <Pearu Peterson> Document copy_to_host and copy_from_host
methods. Complete unittests for copy_to_host.
11cba54d2 <Pearu Peterson> Add copy_to_host and copy_from_host methods to
CudaBuffer
ae3cd3fda <Pearu Peterson> Exposed all Arrow GPU C++ classes and functions
to pyarrow. WIP
cf42941e9 <Pearu Peterson> Expose Message for lib_gpu.
49b0190ba <Pearu Peterson> Expose CudaDeviceManager, CudaContext,
CudaIpcMemHandle to pyarrow.
a2d755791 <Pearu Peterson> Minimal GPU support for pyarrow. WIP.
---
cpp/cmake_modules/FindArrowCuda.cmake | 124 +++++
python/CMakeLists.txt | 32 ++
python/pyarrow/_cuda.pxd | 62 +++
python/pyarrow/_cuda.pyx | 751 ++++++++++++++++++++++++++++++
python/pyarrow/array.pxi | 1 +
python/pyarrow/cuda.py | 24 +
python/pyarrow/includes/libarrow_cuda.pxd | 105 +++++
python/pyarrow/ipc.pxi | 3 -
python/pyarrow/lib.pxd | 4 +
python/pyarrow/tests/test_cuda.py | 579 +++++++++++++++++++++++
python/setup.py | 12 +-
11 files changed, 1693 insertions(+), 4 deletions(-)
diff --git a/cpp/cmake_modules/FindArrowCuda.cmake
b/cpp/cmake_modules/FindArrowCuda.cmake
new file mode 100644
index 0000000..8733b61
--- /dev/null
+++ b/cpp/cmake_modules/FindArrowCuda.cmake
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# - Find ARROW CUDA (arrow/gpu/cuda_api.h, libarrow_gpu.a, libarrow_gpu.so)
+#
+# This module requires Arrow from which it uses
+# ARROW_FOUND
+# ARROW_SEARCH_HEADER_PATHS
+# ARROW_SEARCH_LIB_PATH
+# ARROW_HOME
+#
+# This module defines
+# ARROW_CUDA_INCLUDE_DIR, directory containing headers
+# ARROW_CUDA_LIBS, directory containing arrow libraries
+# ARROW_CUDA_STATIC_LIB, path to libarrow.a
+# ARROW_CUDA_SHARED_LIB, path to libarrow's shared library
+# ARROW_CUDA_SHARED_IMP_LIB, path to libarrow's import library (MSVC only)
+# ARROW_CUDA_FOUND, whether arrow has been found
+
+#
+# TODO(ARROW-3209): rename arrow/gpu to arrow/cuda, arrow_gpu to arrow_cuda
+#
+
+include(FindPkgConfig)
+include(GNUInstallDirs)
+
+if (NOT DEFINED ARROW_FOUND)
+ if (ArrowCuda_FIND_REQUIRED)
+ find_package(Arrow REQUIRED)
+ else()
+ find_package(Arrow)
+ endif()
+endif()
+
+if (NOT ARROW_FOUND)
+ set(ARROW_CUDA_FOUND FALSE)
+ return()
+endif()
+
+find_path(ARROW_CUDA_INCLUDE_DIR arrow/gpu/cuda_api.h PATHS
+ ${ARROW_SEARCH_HEADER_PATHS}
+ NO_DEFAULT_PATH
+ )
+
+if (NOT (ARROW_CUDA_INCLUDE_DIR STREQUAL ARROW_INCLUDE_DIR))
+ set(ARROW_CUDA_WARN_MSG "Mismatch of Arrow and Arrow CUDA include
directories:")
+ set(ARROW_CUDA_WARN_MSG "${ARROW_CUDA_WARN_MSG}
ARROW_INCLUDE_DIR=${ARROW_INCLUDE_DIR}")
+ set(ARROW_CUDA_WARN_MSG "${ARROW_CUDA_WARN_MSG}
ARROW_CUDA_INCLUDE_DIR=${ARROW_CUDA_INCLUDE_DIR}")
+ message(WARNING ${ARROW_CUDA_WARN_MSG})
+endif()
+
+find_library(ARROW_CUDA_LIB_PATH NAMES arrow_gpu
+ PATHS
+ ${ARROW_SEARCH_LIB_PATH}
+ NO_DEFAULT_PATH)
+get_filename_component(ARROW_CUDA_LIBS ${ARROW_CUDA_LIB_PATH} DIRECTORY)
+
+if (MSVC)
+ find_library(ARROW_CUDA_SHARED_LIBRARIES NAMES arrow_gpu
+ PATHS ${ARROW_HOME} NO_DEFAULT_PATH
+ PATH_SUFFIXES "bin" )
+ get_filename_component(ARROW_CUDA_SHARED_LIBS ${ARROW_CUDA_SHARED_LIBRARIES}
PATH )
+endif()
+
+
+if (ARROW_CUDA_INCLUDE_DIR AND ARROW_CUDA_LIBS)
+ set(ARROW_CUDA_FOUND TRUE)
+ set(ARROW_CUDA_LIB_NAME arrow_gpu)
+ if (MSVC)
+ set(ARROW_CUDA_STATIC_LIB
${ARROW_CUDA_LIBS}/${ARROW_CUDA_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
+ set(ARROW_CUDA_SHARED_LIB
${ARROW_CUDA_SHARED_LIBS}/${ARROW_CUDA_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+ set(ARROW_CUDA_SHARED_IMP_LIB
${ARROW_CUDA_LIBS}/${ARROW_CUDA_LIB_NAME}.lib)
+ else()
+ set(ARROW_CUDA_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_CUDA_LIB_NAME}.a)
+ set(ARROW_CUDA_SHARED_LIB
${ARROW_LIBS}/lib${ARROW_CUDA_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+ endif()
+endif()
+
+if (ARROW_CUDA_FOUND)
+ if (NOT ArrowCuda_FIND_QUIETLY)
+ message(STATUS "Found the Arrow CUDA library: ${ARROW_CUDA_LIB_PATH}")
+ endif()
+else()
+ if (NOT ArrowCuda_FIND_QUIETLY)
+ set(ARROW_CUDA_ERR_MSG "Could not find the Arrow CUDA library. Looked for
headers")
+ set(ARROW_CUDA_ERR_MSG "${ARROW_CUDA_ERR_MSG} in
${ARROW_SEARCH_HEADER_PATHS}, and for libs")
+ set(ARROW_CUDA_ERR_MSG "${ARROW_CUDA_ERR_MSG} in ${ARROW_SEARCH_LIB_PATH}")
+ if (ArrowCuda_FIND_REQUIRED)
+ message(FATAL_ERROR "${ARROW_CUDA_ERR_MSG}")
+ else(ArrowCuda_FIND_REQUIRED)
+ message(STATUS "${ARROW_CUDA_ERR_MSG}")
+ endif (ArrowCuda_FIND_REQUIRED)
+ endif ()
+ set(ARROW_CUDA_FOUND FALSE)
+endif ()
+
+if (MSVC)
+ mark_as_advanced(
+ ARROW_CUDA_INCLUDE_DIR
+ ARROW_CUDA_STATIC_LIB
+ ARROW_CUDA_SHARED_LIB
+ ARROW_CUDA_SHARED_IMP_LIB
+ )
+else()
+ mark_as_advanced(
+ ARROW_CUDA_INCLUDE_DIR
+ ARROW_CUDA_STATIC_LIB
+ ARROW_CUDA_SHARED_LIB
+ )
+endif()
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 883b2dc..8ec5b1a 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -17,6 +17,9 @@
#
# Includes code assembled from BSD/MIT/Apache-licensed code from some 3rd-party
# projects, including Kudu, Impala, and libdynd. See python/LICENSE.txt
+#
+# TODO(ARROW-3209): rename arrow_gpu to arrow_cuda
+#
cmake_minimum_required(VERSION 2.7)
project(pyarrow)
@@ -58,6 +61,9 @@ endif()
# Top level cmake dir
if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
+ option(PYARROW_BUILD_CUDA
+ "Build the PyArrow CUDA support"
+ OFF)
option(PYARROW_BUILD_PARQUET
"Build the PyArrow Parquet integration"
OFF)
@@ -368,6 +374,32 @@ set(LINK_LIBS
arrow_python_shared
)
+if (PYARROW_BUILD_CUDA)
+ ## Arrow CUDA
+ find_package(ArrowCuda)
+ if(NOT ARROW_CUDA_FOUND)
+ message(FATAL_ERROR "Unable to locate Arrow CUDA libraries")
+ else()
+ if (PYARROW_BUNDLE_ARROW_CPP)
+ bundle_arrow_lib(ARROW_CUDA_SHARED_LIB
+ ABI_VERSION ${ARROW_ABI_VERSION}
+ SO_VERSION ${ARROW_SO_VERSION})
+ if (MSVC)
+ bundle_arrow_implib(ARROW_CUDA_SHARED_IMP_LIB)
+ endif()
+ endif()
+ if (MSVC)
+ ADD_THIRDPARTY_LIB(arrow_gpu
+ SHARED_LIB ${ARROW_CUDA_SHARED_IMP_LIB})
+ else()
+ ADD_THIRDPARTY_LIB(arrow_gpu
+ SHARED_LIB ${ARROW_CUDA_SHARED_LIB})
+ endif()
+ set(LINK_LIBS ${LINK_LIBS} arrow_gpu_shared)
+ set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _cuda)
+ endif()
+endif()
+
if (PYARROW_BUILD_PARQUET)
## Parquet
find_package(Parquet)
diff --git a/python/pyarrow/_cuda.pxd b/python/pyarrow/_cuda.pxd
new file mode 100644
index 0000000..919daee
--- /dev/null
+++ b/python/pyarrow/_cuda.pxd
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.lib cimport *
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.libarrow_cuda cimport *
+
+
+cdef class Context:
+ cdef:
+ shared_ptr[CCudaContext] context
+ int device_number
+
+ cdef void init(self, const shared_ptr[CCudaContext]& ctx)
+
+
+cdef class IpcMemHandle:
+ cdef:
+ shared_ptr[CCudaIpcMemHandle] handle
+
+ cdef void init(self, shared_ptr[CCudaIpcMemHandle]& h)
+
+
+cdef class CudaBuffer(Buffer):
+ cdef:
+ shared_ptr[CCudaBuffer] cuda_buffer
+
+ cdef void init_cuda(self, const shared_ptr[CCudaBuffer]& buffer)
+
+
+cdef class HostBuffer(Buffer):
+ cdef:
+ shared_ptr[CCudaHostBuffer] host_buffer
+
+ cdef void init_host(self, const shared_ptr[CCudaHostBuffer]& buffer)
+
+
+cdef class BufferReader(NativeFile):
+ cdef:
+ CCudaBufferReader* reader
+ CudaBuffer buffer
+
+
+cdef class BufferWriter(NativeFile):
+ cdef:
+ CCudaBufferWriter* writer
+ CudaBuffer buffer
diff --git a/python/pyarrow/_cuda.pyx b/python/pyarrow/_cuda.pyx
new file mode 100644
index 0000000..74e8555
--- /dev/null
+++ b/python/pyarrow/_cuda.pyx
@@ -0,0 +1,751 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.compat import tobytes
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow_cuda cimport *
+from pyarrow.lib import py_buffer, allocate_buffer, as_buffer
+cimport cpython as cp
+
+
+cdef class Context:
+ """ CUDA driver context.
+ """
+
+ def __cinit__(self, int device_number=0):
+ """Construct the shared CUDA driver context for a particular device.
+
+ Parameters
+ ----------
+ device_number : int
+ Specify the gpu device for which the CUDA driver context is
+ requested.
+
+ """
+ cdef CCudaDeviceManager* manager
+ check_status(CCudaDeviceManager.GetInstance(&manager))
+ cdef int n = manager.num_devices()
+ if device_number >= n or device_number < 0:
+ self.context.reset()
+ raise ValueError('device_number argument must be '
+ 'non-negative less than %s' % (n))
+ check_status(manager.GetContext(device_number, &self.context))
+ self.device_number = device_number
+
+ @staticmethod
+ def get_num_devices():
+ """ Return the number of GPU devices.
+ """
+ cdef CCudaDeviceManager* manager
+ check_status(CCudaDeviceManager.GetInstance(&manager))
+ return manager.num_devices()
+
+ @property
+ def device_number(self):
+ """ Return context device number.
+ """
+ return self.device_number
+
+ cdef void init(self, const shared_ptr[CCudaContext]& ctx):
+ self.context = ctx
+
+ @property
+ def bytes_allocated(self):
+ """ Return the number of allocated bytes.
+ """
+ return self.context.get().bytes_allocated()
+
+ def new_buffer(self, nbytes):
+ """Return new device buffer.
+
+ Parameters
+ ----------
+ nbytes : int
+ Specify the number of bytes to be allocated.
+
+ Returns
+ -------
+ buf : CudaBuffer
+ Allocated buffer.
+ """
+ cdef shared_ptr[CCudaBuffer] cudabuf
+ check_status(self.context.get().Allocate(nbytes, &cudabuf))
+ return pyarrow_wrap_cudabuffer(cudabuf)
+
+ def open_ipc_buffer(self, ipc_handle):
+ """ Open existing CUDA IPC memory handle
+
+ Parameters
+ ----------
+ ipc_handle : IpcMemHandle
+ Specify opaque pointer to CUipcMemHandle (driver API).
+
+ Returns
+ -------
+ buf : CudaBuffer
+ referencing device buffer
+ """
+ handle = pyarrow_unwrap_cudaipcmemhandle(ipc_handle)
+ cdef shared_ptr[CCudaBuffer] cudabuf
+ check_status(self.context.get().OpenIpcBuffer(handle.get()[0],
+ &cudabuf))
+ return pyarrow_wrap_cudabuffer(cudabuf)
+
+ def buffer_from_data(self, object data, int64_t offset=0, int64_t size=-1):
+ """Create device buffer and initalize with data.
+
+ Parameters
+ ----------
+ data : {HostBuffer, Buffer, array-like}
+ Specify data to be copied to device buffer.
+ offset : int
+ Specify the offset of input buffer for device data
+ buffering. Default: 0.
+ size : int
+ Specify the size of device buffer in bytes. Default: all
+ (starting from input offset)
+
+ Returns
+ -------
+ cbuf : CudaBuffer
+ Device buffer with copied data.
+ """
+ if pyarrow_is_cudabuffer(data):
+ raise NotImplementedError('copying data from device to device')
+
+ buf = as_buffer(data)
+
+ bsize = buf.size
+ if offset < 0 or offset >= bsize:
+ raise ValueError('offset argument is out-of-range')
+ if size < 0:
+ size = bsize - offset
+ elif offset + size > bsize:
+ raise ValueError(
+ 'requested larger slice than available in device buffer')
+
+ if offset != 0 or size != bsize:
+ buf = buf.slice(offset, size)
+
+ result = self.new_buffer(size)
+ result.copy_from_host(buf, position=0, nbytes=size)
+ return result
+
+
+cdef class IpcMemHandle:
+ """A container for a CUDA IPC handle.
+ """
+ cdef void init(self, shared_ptr[CCudaIpcMemHandle]& h):
+ self.handle = h
+
+ @staticmethod
+ def from_buffer(Buffer opaque_handle):
+ """Create IpcMemHandle from opaque buffer (e.g. from another
+ process)
+
+ Parameters
+ ----------
+ opaque_handle :
+ a CUipcMemHandle as a const void*
+
+ Results
+ -------
+ ipc_handle : IpcMemHandle
+ """
+ cdef shared_ptr[CCudaIpcMemHandle] handle
+ buf_ = pyarrow_unwrap_buffer(opaque_handle)
+ check_status(CCudaIpcMemHandle.FromBuffer(buf_.get().data(), &handle))
+ return pyarrow_wrap_cudaipcmemhandle(handle)
+
+ def serialize(self, pool=None):
+ """Write IpcMemHandle to a Buffer
+
+ Parameters
+ ----------
+ pool : {MemoryPool, None}
+ Specify a pool to allocate memory from
+
+ Returns
+ -------
+ buf : Buffer
+ The serialized buffer.
+ """
+ cdef CMemoryPool* pool_ = maybe_unbox_memory_pool(pool)
+ cdef shared_ptr[CBuffer] buf
+ cdef CCudaIpcMemHandle* h = self.handle.get()
+ check_status(h.Serialize(pool_, &buf))
+ return pyarrow_wrap_buffer(buf)
+
+
+cdef class CudaBuffer(Buffer):
+ """An Arrow buffer with data located in a GPU device.
+
+ To create a CudaBuffer instance, use
+
+ <Context instance>.device_buffer(data=<object>, offset=<offset>,
+ size=<nbytes>)
+
+ The memory allocated in CudaBuffer instance is freed when the
+ instance is deleted.
+
+ """
+
+ def __init__(self):
+ raise TypeError("Do not call CudaBuffer's constructor directly, use "
+ "`<pyarrow.Context instance>.device_buffer`"
+ " method instead.")
+
+ cdef void init_cuda(self, const shared_ptr[CCudaBuffer]& buffer):
+ self.cuda_buffer = buffer
+ self.init(<shared_ptr[CBuffer]> buffer)
+
+ @staticmethod
+ def from_buffer(buf):
+ """ Convert back generic buffer into CudaBuffer
+
+ Parameters
+ ----------
+ buf : Buffer
+ Specify buffer containing CudaBuffer
+
+ Returns
+ -------
+ dbuf : CudaBuffer
+ Resulting device buffer.
+ """
+ buf_ = pyarrow_unwrap_buffer(buf)
+ cdef shared_ptr[CCudaBuffer] cbuf
+ check_status(CCudaBuffer.FromBuffer(buf_, &cbuf))
+ return pyarrow_wrap_cudabuffer(cbuf)
+
+ cdef getitem(self, int64_t i):
+ return self.copy_to_host(position=i, nbytes=1)[0]
+
+ def copy_to_host(self, int64_t position=0, int64_t nbytes=-1,
+ Buffer buf=None,
+ MemoryPool memory_pool=None, c_bool resizable=False):
+ """Copy memory from GPU device to CPU host
+
+ Parameters
+ ----------
+ position : int
+ Specify the starting position of the source data in GPU
+ device buffer. Default: 0.
+ nbytes : int
+ Specify the number of bytes to copy. Default: -1 (all from
+ the position until host buffer is full).
+ buf : Buffer
+ Specify a pre-allocated output buffer in host. Default: None
+ (allocate new output buffer).
+ memory_pool : MemoryPool
+ resizable : bool
+ Specify extra arguments to allocate_buffer. Used only when
+ buf is None.
+
+ Returns
+ -------
+ buf : Buffer
+ Output buffer in host.
+
+ """
+ if position < 0 or position > self.size:
+ raise ValueError('position argument is out-of-range')
+ cdef int64_t nbytes_
+ if buf is None:
+ if nbytes < 0:
+ # copy all starting from position to new host buffer
+ nbytes_ = self.size - position
+ else:
+ if nbytes > self.size - position:
+ raise ValueError(
+ 'requested more to copy than available from '
+ 'device buffer')
+ # copy nbytes starting from position to new host buffeer
+ nbytes_ = nbytes
+ buf = allocate_buffer(nbytes_, memory_pool=memory_pool,
+ resizable=resizable)
+ else:
+ if nbytes < 0:
+ # copy all from position until given host buffer is full
+ nbytes_ = min(self.size - position, buf.size)
+ else:
+ if nbytes > buf.size:
+ raise ValueError(
+ 'requested copy does not fit into host buffer')
+ # copy nbytes from position to given host buffer
+ nbytes_ = nbytes
+ cdef shared_ptr[CBuffer] buf_ = pyarrow_unwrap_buffer(buf)
+ cdef int64_t position_ = position
+ with nogil:
+ check_status(self.cuda_buffer.get()
+ .CopyToHost(position_, nbytes_,
+ buf_.get().mutable_data()))
+ return buf
+
+ def copy_from_host(self, data, int64_t position=0, int64_t nbytes=-1):
+ """Copy data from host to device.
+
+ The device buffer must be pre-allocated.
+
+ Parameters
+ ----------
+ data : {Buffer, array-like}
+ Specify data in host. It can be array-like that is valid
+ argument to py_buffer
+ position : int
+ Specify the starting position of the copy in devive buffer.
+ Default: 0.
+ nbytes : int
+ Specify the number of bytes to copy. Default: -1 (all from
+ source until device buffer, starting from position, is full)
+
+ Returns
+ -------
+ nbytes : int
+ Number of bytes copied.
+ """
+ if position < 0 or position > self.size:
+ raise ValueError('position argument is out-of-range')
+ cdef int64_t nbytes_
+ buf = as_buffer(data)
+
+ if nbytes < 0:
+ # copy from host buffer to device buffer starting from
+ # position until device buffer is full
+ nbytes_ = min(self.size - position, buf.size)
+ else:
+ if nbytes > buf.size:
+ raise ValueError(
+ 'requested more to copy than available from host buffer')
+ if nbytes > self.size - position:
+ raise ValueError(
+ 'requested more to copy than available in device buffer')
+ # copy nbytes from host buffer to device buffer starting
+ # from position
+ nbytes_ = nbytes
+
+ cdef shared_ptr[CBuffer] buf_ = pyarrow_unwrap_buffer(buf)
+ cdef int64_t position_ = position
+ with nogil:
+ check_status(self.cuda_buffer.get().
+ CopyFromHost(position_, buf_.get().data(), nbytes_))
+ return nbytes_
+
+ def export_for_ipc(self):
+ """
+ Expose this device buffer as IPC memory which can be used in other
+ processes.
+
+ After calling this function, this device memory will not be
+ freed when the CudaBuffer is destructed.
+
+ Results
+ -------
+ ipc_handle : IpcMemHandle
+ The exported IPC handle
+
+ """
+ cdef shared_ptr[CCudaIpcMemHandle] handle
+ check_status(self.cuda_buffer.get().ExportForIpc(&handle))
+ return pyarrow_wrap_cudaipcmemhandle(handle)
+
+ @property
+ def context(self):
+ """Returns the CUDA driver context of this buffer.
+ """
+ return pyarrow_wrap_cudacontext(self.cuda_buffer.get().context())
+
+ def slice(self, offset=0, length=None):
+ """Return slice of device buffer
+
+ Parameters
+ ----------
+ offset : int, default 0
+ Specify offset from the start of device buffer to slice
+ length : int, default None
+ Specify the length of slice (default is until end of device
+ buffer starting from offset)
+
+ Returns
+ -------
+ sliced : CudaBuffer
+ Zero-copy slice of device buffer.
+ """
+ if offset < 0 or offset >= self.size:
+ raise ValueError('offset argument is out-of-range')
+ cdef int64_t offset_ = offset
+ cdef int64_t size
+ if length is None:
+ size = self.size - offset_
+ elif offset + length <= self.size:
+ size = length
+ else:
+ raise ValueError(
+ 'requested larger slice than available in device buffer')
+ parent = pyarrow_unwrap_cudabuffer(self)
+ return pyarrow_wrap_cudabuffer(make_shared[CCudaBuffer](parent,
+ offset_, size))
+
+ def to_pybytes(self):
+ """Return device buffer content as Python bytes.
+ """
+ return self.copy_to_host().to_pybytes()
+
+ def __getbuffer__(self, cp.Py_buffer* buffer, int flags):
+ # Device buffer contains data pointers on the device. Hence,
+ # cannot support buffer protocol PEP-3118 for CudaBuffer.
+ raise BufferError('buffer protocol for device buffer not supported')
+
+ def __getreadbuffer__(self, Py_ssize_t idx, void** p):
+ # Python 2.x specific method
+ raise NotImplementedError('CudaBuffer.__getreadbuffer__')
+
+ def __getwritebuffer__(self, Py_ssize_t idx, void** p):
+ # Python 2.x specific method
+ raise NotImplementedError('CudaBuffer.__getwritebuffer__')
+
+
+cdef class HostBuffer(Buffer):
+ """Device-accessible CPU memory created using cudaHostAlloc.
+
+ To create a HostBuffer instance, use
+
+ cuda.new_host_buffer(<nbytes>)
+ """
+
+ def __init__(self):
+ raise TypeError("Do not call HostBuffer's constructor directly,"
+ " use `cuda.new_host_buffer` function instead.")
+
+ cdef void init_host(self, const shared_ptr[CCudaHostBuffer]& buffer):
+ self.host_buffer = buffer
+ self.init(<shared_ptr[CBuffer]> buffer)
+
+ @property
+ def size(self):
+ return self.host_buffer.get().size()
+
+
+cdef class BufferReader(NativeFile):
+ """File interface for zero-copy read from CUDA buffers.
+
+ Note: Read methods return pointers to device memory. This means
+ you must be careful using this interface with any Arrow code which
+ may expect to be able to do anything other than pointer arithmetic
+ on the returned buffers.
+ """
+ def __cinit__(self, CudaBuffer obj):
+ self.buffer = obj
+ self.reader = new CCudaBufferReader(self.buffer.buffer)
+ self.rd_file.reset(self.reader)
+ self.is_readable = True
+ self.closed = False
+
+ def read_buffer(self, nbytes=None):
+ """Return a slice view of the underlying device buffer.
+
+ The slice will start at the current reader position and will
+ have specified size in bytes.
+
+ Parameters
+ ----------
+ nbytes : int, default None
+ Specify the number of bytes to read. Default: None (read all
+ remaining bytes).
+
+ Returns
+ -------
+ cbuf : CudaBuffer
+ New device buffer.
+
+ """
+ cdef:
+ int64_t c_nbytes
+ int64_t bytes_read = 0
+ shared_ptr[CCudaBuffer] output
+
+ if nbytes is None:
+ c_nbytes = self.size() - self.tell()
+ else:
+ c_nbytes = nbytes
+
+ with nogil:
+ check_status(self.reader.Read(c_nbytes,
+ <shared_ptr[CBuffer]*> &output))
+
+ return pyarrow_wrap_cudabuffer(output)
+
+
+cdef class BufferWriter(NativeFile):
+ """File interface for writing to CUDA buffers.
+
+ By default writes are unbuffered. Use set_buffer_size to enable
+ buffering.
+ """
+ def __cinit__(self, CudaBuffer buffer):
+ self.buffer = buffer
+ self.writer = new CCudaBufferWriter(self.buffer.cuda_buffer)
+ self.wr_file.reset(self.writer)
+ self.is_writable = True
+ self.closed = False
+
+ def writeat(self, int64_t position, object data):
+ """Write data to buffer starting from position.
+
+ Parameters
+ ----------
+ position : int
+ Specify device buffer position where the data will be
+ written.
+ data : array-like
+ Specify data, the data instance must implement buffer
+ protocol.
+ """
+ cdef Buffer arrow_buffer = as_buffer(data)
+ cdef const uint8_t* buf = arrow_buffer.buffer.get().data()
+ cdef int64_t bufsize = len(arrow_buffer)
+ with nogil:
+ check_status(self.writer.WriteAt(position, buf, bufsize))
+
+ def flush(self):
+ """ Flush the buffer stream """
+ with nogil:
+ check_status(self.writer.Flush())
+
+ def seek(self, int64_t position, int whence=0):
+ # TODO: remove this method after NativeFile.seek supports
+ # writeable files.
+ cdef int64_t offset
+
+ with nogil:
+ if whence == 0:
+ offset = position
+ elif whence == 1:
+ check_status(self.writer.Tell(&offset))
+ offset = offset + position
+ else:
+ with gil:
+ raise ValueError("Invalid value of whence: {0}"
+ .format(whence))
+ check_status(self.writer.Seek(offset))
+ return self.tell()
+
+ @property
+ def buffer_size(self):
+ """Returns size of host (CPU) buffer, 0 for unbuffered
+ """
+ return self.writer.buffer_size()
+
+ @buffer_size.setter
+ def buffer_size(self, int64_t buffer_size):
+ """Set CPU buffer size to limit calls to cudaMemcpy
+
+ Parameters
+ ----------
+ buffer_size : int
+ Specify the size of CPU buffer to allocate in bytes.
+ """
+ with nogil:
+ check_status(self.writer.SetBufferSize(buffer_size))
+
+ @property
+ def num_bytes_buffered(self):
+ """Returns number of bytes buffered on host
+ """
+ return self.writer.num_bytes_buffered()
+
+# Functions
+
+
+def new_host_buffer(const int64_t size):
+ """Return buffer with CUDA-accessible memory on CPU host
+
+ Parameters
+ ----------
+ size : int
+ Specify the number of bytes to be allocated.
+
+ Returns
+ -------
+ dbuf : HostBuffer
+ Allocated host buffer
+ """
+ cdef shared_ptr[CCudaHostBuffer] buffer
+ check_status(AllocateCudaHostBuffer(size, &buffer))
+ return pyarrow_wrap_cudahostbuffer(buffer)
+
+
+def serialize_record_batch(object batch, object ctx):
+ """ Write record batch message to GPU device memory
+
+ Parameters
+ ----------
+ batch : RecordBatch
+ Specify record batch to write
+ ctx : Context
+ Specify context to allocate device memory from
+
+ Returns
+ -------
+ dbuf : CudaBuffer
+ device buffer which contains the record batch message
+ """
+ cdef shared_ptr[CCudaBuffer] buffer
+ cdef CRecordBatch* batch_ = pyarrow_unwrap_batch(batch).get()
+ cdef CCudaContext* ctx_ = pyarrow_unwrap_cudacontext(ctx).get()
+ with nogil:
+ check_status(CudaSerializeRecordBatch(batch_[0], ctx_, &buffer))
+ return pyarrow_wrap_cudabuffer(buffer)
+
+
+def read_message(object source, pool=None):
+ """ Read Arrow IPC message located on GPU device
+
+ Parameters
+ ----------
+ source : {CudaBuffer, cuda.BufferReader}
+ Specify device buffer or reader of device buffer.
+ pool : {MemoryPool, None}
+ Specify pool to allocate CPU memory for the metadata
+
+ Returns
+ -------
+ message : Message
+ the deserialized message, body still on device
+ """
+ cdef:
+ Message result = Message.__new__(Message)
+ cdef CMemoryPool* pool_ = maybe_unbox_memory_pool(pool)
+ if not isinstance(source, BufferReader):
+ reader = BufferReader(source)
+ check_status(CudaReadMessage(reader.reader, pool_, &result.message))
+ return result
+
+
+def read_record_batch(object schema, object buffer, pool=None):
+ """Construct RecordBatch referencing IPC message located on CUDA device.
+
+ While the metadata is copied to host memory for deserialization,
+ the record batch data remains on the device.
+
+ Parameters
+ ----------
+ schema : Schema
+ Specify schema for the record batch
+ buffer :
+ Specify device buffer containing the complete IPC message
+ pool : {MemoryPool, None}
+ Specify pool to use for allocating space for the metadata
+
+ Returns
+ -------
+ batch : RecordBatch
+ reconstructed record batch, with device pointers
+
+ """
+ cdef shared_ptr[CSchema] schema_ = pyarrow_unwrap_schema(schema)
+ cdef shared_ptr[CCudaBuffer] buffer_ = pyarrow_unwrap_cudabuffer(buffer)
+ cdef CMemoryPool* pool_ = maybe_unbox_memory_pool(pool)
+ cdef shared_ptr[CRecordBatch] batch
+ check_status(CudaReadRecordBatch(schema_, buffer_, pool_, &batch))
+ return pyarrow_wrap_batch(batch)
+
+
+# Public API
+
+
+cdef public api bint pyarrow_is_buffer(object buffer):
+ return isinstance(buffer, Buffer)
+
+# cudabuffer
+
+cdef public api bint pyarrow_is_cudabuffer(object buffer):
+ return isinstance(buffer, CudaBuffer)
+
+
+cdef public api object \
+ pyarrow_wrap_cudabuffer(const shared_ptr[CCudaBuffer]& buf):
+ cdef CudaBuffer result = CudaBuffer.__new__(CudaBuffer)
+ result.init_cuda(buf)
+ return result
+
+
+cdef public api shared_ptr[CCudaBuffer] pyarrow_unwrap_cudabuffer(object obj):
+ if pyarrow_is_cudabuffer(obj):
+ return (<CudaBuffer>obj).cuda_buffer
+ raise TypeError('expected CudaBuffer instance, got %s'
+ % (type(obj).__name__))
+
+# cudahostbuffer
+
+cdef public api bint pyarrow_is_cudahostbuffer(object buffer):
+ return isinstance(buffer, HostBuffer)
+
+
+cdef public api object \
+ pyarrow_wrap_cudahostbuffer(const shared_ptr[CCudaHostBuffer]& buf):
+ cdef HostBuffer result = HostBuffer.__new__(HostBuffer)
+ result.init_host(buf)
+ return result
+
+
+cdef public api shared_ptr[CCudaHostBuffer] \
+ pyarrow_unwrap_cudahostbuffer(object obj):
+ if pyarrow_is_cudahostbuffer(obj):
+ return (<HostBuffer>obj).host_buffer
+ raise TypeError('expected HostBuffer instance, got %s'
+ % (type(obj).__name__))
+
+# cudacontext
+
+cdef public api bint pyarrow_is_cudacontext(object ctx):
+ return isinstance(ctx, Context)
+
+
+cdef public api object \
+ pyarrow_wrap_cudacontext(const shared_ptr[CCudaContext]& ctx):
+ cdef Context result = Context.__new__(Context)
+ result.init(ctx)
+ return result
+
+
+cdef public api shared_ptr[CCudaContext] \
+ pyarrow_unwrap_cudacontext(object obj):
+ if pyarrow_is_cudacontext(obj):
+ return (<Context>obj).context
+ raise TypeError('expected Context instance, got %s'
+ % (type(obj).__name__))
+
+# cudaipcmemhandle
+
+cdef public api bint pyarrow_is_cudaipcmemhandle(object handle):
+ return isinstance(handle, IpcMemHandle)
+
+
+cdef public api object \
+ pyarrow_wrap_cudaipcmemhandle(shared_ptr[CCudaIpcMemHandle]& h):
+ cdef IpcMemHandle result = IpcMemHandle.__new__(IpcMemHandle)
+ result.init(h)
+ return result
+
+
+cdef public api shared_ptr[CCudaIpcMemHandle] \
+ pyarrow_unwrap_cudaipcmemhandle(object obj):
+ if pyarrow_is_cudaipcmemhandle(obj):
+ return (<IpcMemHandle>obj).handle
+ raise TypeError('expected IpcMemHandle instance, got %s'
+ % (type(obj).__name__))
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index 52af717..8962ac4 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -163,6 +163,7 @@ def array(object obj, type=None, mask=None, size=None, bint
from_pandas=False,
from_pandas=from_pandas, safe=safe,
memory_pool=memory_pool)
else:
+ import pyarrow.pandas_compat as pdcompat
values, type = pdcompat.get_datetimetz_type(values, obj.dtype,
type)
return _ndarray_to_array(values, mask, type, from_pandas, safe,
diff --git a/python/pyarrow/cuda.py b/python/pyarrow/cuda.py
new file mode 100644
index 0000000..29a217c
--- /dev/null
+++ b/python/pyarrow/cuda.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# flake8: noqa
+
+from pyarrow._cuda import (Context, IpcMemHandle, CudaBuffer,
+ HostBuffer, BufferReader, BufferWriter,
+ new_host_buffer,
+ serialize_record_batch, read_message,
+ read_record_batch)
diff --git a/python/pyarrow/includes/libarrow_cuda.pxd
b/python/pyarrow/includes/libarrow_cuda.pxd
new file mode 100644
index 0000000..aca6006
--- /dev/null
+++ b/python/pyarrow/includes/libarrow_cuda.pxd
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from pyarrow.includes.libarrow cimport *
+
+cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::gpu" nogil:
+
+ cdef cppclass CCudaDeviceManager" arrow::gpu::CudaDeviceManager":
+ @staticmethod
+ CStatus GetInstance(CCudaDeviceManager** manager)
+ CStatus GetContext(int gpu_number, shared_ptr[CCudaContext]* ctx)
+ # CStatus CreateNewContext(int gpu_number,
+ # shared_ptr[CCudaContext]* ctx)
+ CStatus AllocateHost(int64_t nbytes,
+ shared_ptr[CCudaHostBuffer]* buffer)
+ # CStatus FreeHost(void* data, int64_t nbytes)
+ int num_devices() const
+
+ cdef cppclass CCudaContext" arrow::gpu::CudaContext":
+ shared_ptr[CCudaContext] shared_from_this()
+ # CStatus Close()
+ CStatus Allocate(int64_t nbytes, shared_ptr[CCudaBuffer]* out)
+ CStatus OpenIpcBuffer(const CCudaIpcMemHandle& ipc_handle,
+ shared_ptr[CCudaBuffer]* buffer)
+ int64_t bytes_allocated() const
+
+ cdef cppclass CCudaIpcMemHandle" arrow::gpu::CudaIpcMemHandle":
+ @staticmethod
+ CStatus FromBuffer(const void* opaque_handle,
+ shared_ptr[CCudaIpcMemHandle]* handle)
+ CStatus Serialize(CMemoryPool* pool, shared_ptr[CBuffer]* out) const
+
+ cdef cppclass CCudaBuffer" arrow::gpu::CudaBuffer"(CBuffer):
+ CCudaBuffer(uint8_t* data, int64_t size,
+ const shared_ptr[CCudaContext]& context,
+ c_bool own_data=false, c_bool is_ipc=false)
+ CCudaBuffer(const shared_ptr[CCudaBuffer]& parent,
+ const int64_t offset, const int64_t size)
+
+ @staticmethod
+ CStatus FromBuffer(shared_ptr[CBuffer] buffer,
+ shared_ptr[CCudaBuffer]* out)
+
+ CStatus CopyToHost(const int64_t position, const int64_t nbytes,
+ void* out) const
+ CStatus CopyFromHost(const int64_t position, const void* data,
+ int64_t nbytes)
+ CStatus ExportForIpc(shared_ptr[CCudaIpcMemHandle]* handle)
+ shared_ptr[CCudaContext] context() const
+
+ cdef cppclass CCudaHostBuffer" arrow::gpu::CudaHostBuffer"(CMutableBuffer):
+ pass
+
+ cdef cppclass \
+ CCudaBufferReader" arrow::gpu::CudaBufferReader"(CBufferReader):
+ CCudaBufferReader(const shared_ptr[CBuffer]& buffer)
+ CStatus Read(int64_t nbytes, int64_t* bytes_read, void* buffer)
+ CStatus Read(int64_t nbytes, shared_ptr[CBuffer]* out)
+
+ cdef cppclass \
+ CCudaBufferWriter" arrow::gpu::CudaBufferWriter"(WriteableFile):
+ CCudaBufferWriter(const shared_ptr[CCudaBuffer]& buffer)
+ CStatus Close()
+ CStatus Flush()
+ # CStatus Seek(int64_t position)
+ CStatus Write(const void* data, int64_t nbytes)
+ CStatus WriteAt(int64_t position, const void* data, int64_t nbytes)
+ # CStatus Tell(int64_t* position) const
+ CStatus SetBufferSize(const int64_t buffer_size)
+ int64_t buffer_size()
+ int64_t num_bytes_buffered() const
+
+ CStatus AllocateCudaHostBuffer(const int64_t size,
+ shared_ptr[CCudaHostBuffer]* out)
+
+ # Cuda prefix is added to avoid picking up arrow::gpu functions
+ # from arrow namespace.
+ CStatus CudaSerializeRecordBatch" arrow::gpu::SerializeRecordBatch"\
+ (const CRecordBatch& batch,
+ CCudaContext* ctx,
+ shared_ptr[CCudaBuffer]* out)
+ CStatus CudaReadMessage" arrow::gpu::ReadMessage"\
+ (CCudaBufferReader* reader,
+ CMemoryPool* pool,
+ unique_ptr[CMessage]* message)
+ CStatus CudaReadRecordBatch" arrow::gpu::ReadRecordBatch"\
+ (const shared_ptr[CSchema]& schema,
+ const shared_ptr[CCudaBuffer]& buffer,
+ CMemoryPool* pool, shared_ptr[CRecordBatch]* out)
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index c60c21c..5c259ea 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -19,9 +19,6 @@ cdef class Message:
"""
Container for an Arrow IPC message with metadata and optional body
"""
- cdef:
- unique_ptr[CMessage] message
-
def __cinit__(self):
pass
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index e392361..7749854 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -29,6 +29,10 @@ cdef extern from "Python.h":
cdef int check_status(const CStatus& status) nogil except -1
+cdef class Message:
+ cdef:
+ unique_ptr[CMessage] message
+
cdef class MemoryPool:
cdef:
diff --git a/python/pyarrow/tests/test_cuda.py
b/python/pyarrow/tests/test_cuda.py
new file mode 100644
index 0000000..926cc48
--- /dev/null
+++ b/python/pyarrow/tests/test_cuda.py
@@ -0,0 +1,579 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+UNTESTED:
+read_message
+"""
+
+
+import pytest
+import pyarrow as pa
+import numpy as np
+import sysconfig
+
+cuda = pytest.importorskip("pyarrow.cuda")
+
+platform = sysconfig.get_platform()
+# TODO: enable ppc64 when Arrow C++ supports IPC in ppc64 systems:
+has_ipc_support = platform == 'linux-x86_64' # or 'ppc64' in platform
+
+cuda_ipc = pytest.mark.skipif(
+ not has_ipc_support,
+ reason='CUDA IPC not supported in platform `%s`' % (platform))
+
+global_context = None # for flake8
+
+
+def setup_module(module):
+ module.global_context = cuda.Context(0)
+
+
+def teardown_module(module):
+ del module.global_context
+
+
+def test_Context():
+ assert cuda.Context.get_num_devices() > 0
+ assert global_context.device_number == 0
+
+ with pytest.raises(ValueError):
+ try:
+ cuda.Context(cuda.Context.get_num_devices())
+ except Exception as e_info:
+ assert str(e_info).startswith(
+ "device_number argument must be non-negative less than")
+ raise
+
+
+def test_manage_allocate_free_host():
+ size = 1024
+
+ buf = cuda.new_host_buffer(size)
+ arr = np.frombuffer(buf, dtype=np.uint8)
+ arr[size//4:3*size//4] = 1
+ arr_cp = arr.copy()
+ arr2 = np.frombuffer(buf, dtype=np.uint8)
+ np.testing.assert_equal(arr2, arr_cp)
+ assert buf.size == size
+
+
+def test_context_allocate_del():
+ bytes_allocated = global_context.bytes_allocated
+ cudabuf = global_context.new_buffer(128)
+ assert global_context.bytes_allocated == bytes_allocated + 128
+ del cudabuf
+ assert global_context.bytes_allocated == bytes_allocated
+
+
+def make_random_buffer(size, target='host'):
+ """Return a host or device buffer with random data.
+ """
+ if target == 'host':
+ assert size >= 0
+ buf = pa.allocate_buffer(size)
+ assert buf.size == size
+ arr = np.frombuffer(buf, dtype=np.uint8)
+ assert arr.size == size
+ arr[:] = np.random.randint(low=0, high=255, size=size, dtype=np.uint8)
+ assert arr.sum() > 0
+ arr_ = np.frombuffer(buf, dtype=np.uint8)
+ np.testing.assert_equal(arr, arr_)
+ return arr, buf
+ elif target == 'device':
+ arr, buf = make_random_buffer(size, target='host')
+ dbuf = global_context.new_buffer(size)
+ assert dbuf.size == size
+ dbuf.copy_from_host(buf, position=0, nbytes=size)
+ return arr, dbuf
+ raise ValueError('invalid target value')
+
+
+def test_context_device_buffer():
+ # Creating device buffer from host buffer;
+ size = 8
+ arr, buf = make_random_buffer(size)
+ cudabuf = global_context.buffer_from_data(buf)
+ assert cudabuf.size == size
+ arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr, arr2)
+
+ # CudaBuffer does not support buffer protocol
+ with pytest.raises(BufferError):
+ try:
+ np.frombuffer(cudabuf, dtype=np.uint8)
+ except Exception as e_info:
+ assert str(e_info).startswith(
+ "buffer protocol for device buffer not supported")
+ raise
+
+ # Creating device buffer from array:
+ cudabuf = global_context.buffer_from_data(arr)
+ assert cudabuf.size == size
+ arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr, arr2)
+
+ # Creating device buffer from bytes:
+ cudabuf = global_context.buffer_from_data(arr.tobytes())
+ assert cudabuf.size == size
+ arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr, arr2)
+
+ # Creating device buffer from another device buffer:
+ # cudabuf2 = global_context.buffer_from_data(cudabuf) # TODO: copy
+ cudabuf2 = cudabuf.slice(0, cudabuf.size) # view
+ assert cudabuf2.size == size
+ arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr, arr2)
+
+ # Creating a device buffer from a slice of host buffer
+ soffset = size//4
+ ssize = 2*size//4
+ cudabuf = global_context.buffer_from_data(buf, offset=soffset,
+ size=ssize)
+ assert cudabuf.size == ssize
+ arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)
+
+ cudabuf = global_context.buffer_from_data(buf.slice(offset=soffset,
+ length=ssize))
+ assert cudabuf.size == ssize
+ arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)
+
+ # Creating a device buffer from a slice of an array
+ cudabuf = global_context.buffer_from_data(arr, offset=soffset, size=ssize)
+ assert cudabuf.size == ssize
+ arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)
+
+ cudabuf = global_context.buffer_from_data(arr[soffset:soffset+ssize])
+ assert cudabuf.size == ssize
+ arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)
+
+ # Creating a device buffer from a slice of bytes
+ cudabuf = global_context.buffer_from_data(arr.tobytes(),
+ offset=soffset,
+ size=ssize)
+ assert cudabuf.size == ssize
+ arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)
+
+ # Creating a device buffer from size
+ cudabuf = global_context.new_buffer(size)
+ assert cudabuf.size == size
+
+ # Creating device buffer from a slice of another device buffer:
+ cudabuf = global_context.buffer_from_data(arr)
+ cudabuf2 = cudabuf.slice(soffset, ssize)
+ assert cudabuf2.size == ssize
+ arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)
+
+ # Creating device buffer from HostBuffer
+
+ buf = cuda.new_host_buffer(size)
+ arr_ = np.frombuffer(buf, dtype=np.uint8)
+ arr_[:] = arr
+ cudabuf = global_context.buffer_from_data(buf)
+ assert cudabuf.size == size
+ arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr, arr2)
+
+ # Creating device buffer from HostBuffer slice
+
+ cudabuf = global_context.buffer_from_data(buf, offset=soffset, size=ssize)
+ assert cudabuf.size == ssize
+ arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)
+
+ cudabuf = global_context.buffer_from_data(
+ buf.slice(offset=soffset, length=ssize))
+ assert cudabuf.size == ssize
+ arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)
+
+
+def test_CudaBuffer():
+ size = 8
+ arr, buf = make_random_buffer(size)
+ assert arr.tobytes() == buf.to_pybytes()
+ cbuf = global_context.buffer_from_data(buf)
+ assert cbuf.size == size
+ assert arr.tobytes() == cbuf.to_pybytes()
+
+ for i in range(size):
+ assert cbuf[i] == arr[i]
+
+ for s in [
+ slice(None),
+ slice(size//4, size//2),
+ ]:
+ assert cbuf[s].to_pybytes() == arr[s].tobytes()
+
+ sbuf = cbuf.slice(size//4, size//2)
+ assert sbuf.parent == cbuf
+
+ with pytest.raises(TypeError):
+ try:
+ cuda.CudaBuffer()
+ except Exception as e_info:
+ assert str(e_info).startswith(
+ "Do not call CudaBuffer's constructor directly")
+ raise
+
+
+def test_HostBuffer():
+ size = 8
+ arr, buf = make_random_buffer(size)
+ assert arr.tobytes() == buf.to_pybytes()
+ hbuf = cuda.new_host_buffer(size)
+ np.frombuffer(hbuf, dtype=np.uint8)[:] = arr
+ assert hbuf.size == size
+ assert arr.tobytes() == hbuf.to_pybytes()
+ for i in range(size):
+ assert hbuf[i] == arr[i]
+ for s in [
+ slice(None),
+ slice(size//4, size//2),
+ ]:
+ assert hbuf[s].to_pybytes() == arr[s].tobytes()
+
+ sbuf = hbuf.slice(size//4, size//2)
+ assert sbuf.parent == hbuf
+
+ del hbuf
+
+ with pytest.raises(TypeError):
+ try:
+ cuda.HostBuffer()
+ except Exception as e_info:
+ assert str(e_info).startswith(
+ "Do not call HostBuffer's constructor directly")
+ raise
+
+
+def test_copy_from_to_host():
+ size = 1024
+
+ # Create a buffer in host containing range(size)
+ buf = pa.allocate_buffer(size, resizable=True) # in host
+ assert isinstance(buf, pa.Buffer)
+ assert not isinstance(buf, cuda.CudaBuffer)
+ arr = np.frombuffer(buf, dtype=np.uint8)
+ assert arr.size == size
+ arr[:] = range(size)
+ arr_ = np.frombuffer(buf, dtype=np.uint8)
+ np.testing.assert_equal(arr, arr_)
+
+ device_buffer = global_context.new_buffer(size)
+ assert isinstance(device_buffer, cuda.CudaBuffer)
+ assert isinstance(device_buffer, pa.Buffer)
+ assert device_buffer.size == size
+
+ device_buffer.copy_from_host(buf, position=0, nbytes=size)
+
+ buf2 = device_buffer.copy_to_host(position=0, nbytes=size)
+ arr2 = np.frombuffer(buf2, dtype=np.uint8)
+ np.testing.assert_equal(arr, arr2)
+
+
+def test_copy_to_host():
+ size = 1024
+ arr, dbuf = make_random_buffer(size, target='device')
+
+ buf = dbuf.copy_to_host()
+ np.testing.assert_equal(arr, np.frombuffer(buf, dtype=np.uint8))
+
+ buf = dbuf.copy_to_host(position=size//4)
+ np.testing.assert_equal(arr[size//4:], np.frombuffer(buf, dtype=np.uint8))
+
+ buf = dbuf.copy_to_host(position=size//4, nbytes=size//8)
+ np.testing.assert_equal(arr[size//4:size//4+size//8],
+ np.frombuffer(buf, dtype=np.uint8))
+
+ buf = dbuf.copy_to_host(position=size//4, nbytes=0)
+ assert buf.size == 0
+
+ for (position, nbytes) in [
+ (size+2, -1), (-2, -1), (size+1, 0), (-3, 0),
+ ]:
+ with pytest.raises(ValueError):
+ try:
+ dbuf.copy_to_host(position=position, nbytes=nbytes)
+ except Exception as e_info:
+ assert str(e_info).startswith(
+ 'position argument is out-of-range')
+ raise
+
+ for (position, nbytes) in [
+ (0, size+1), (size//2, size//2+1), (size, 1)
+ ]:
+ with pytest.raises(ValueError):
+ try:
+ dbuf.copy_to_host(position=position, nbytes=nbytes)
+ except Exception as e_info:
+ assert str(e_info).startswith(
+ 'requested more to copy than available from device buffer')
+ raise
+
+ buf = pa.allocate_buffer(size//4)
+ dbuf.copy_to_host(buf=buf)
+ np.testing.assert_equal(arr[:size//4], np.frombuffer(buf, dtype=np.uint8))
+
+ dbuf.copy_to_host(buf=buf, position=12)
+ np.testing.assert_equal(arr[12:12+size//4],
+ np.frombuffer(buf, dtype=np.uint8))
+
+ dbuf.copy_to_host(buf=buf, nbytes=12)
+ np.testing.assert_equal(arr[:12], np.frombuffer(buf, dtype=np.uint8)[:12])
+
+ dbuf.copy_to_host(buf=buf, nbytes=12, position=6)
+ np.testing.assert_equal(arr[6:6+12],
+ np.frombuffer(buf, dtype=np.uint8)[:12])
+
+ for (position, nbytes) in [
+ (0, size+10), (10, size-5),
+ (0, size//2), (size//4, size//4+1)
+ ]:
+ with pytest.raises(ValueError):
+ try:
+ dbuf.copy_to_host(buf=buf, position=position, nbytes=nbytes)
+ print('dbuf.size={}, buf.size={}, position={}, nbytes={}'
+ .format(dbuf.size, buf.size, position, nbytes))
+ except Exception as e_info:
+ assert str(e_info).startswith(
+ 'requested copy does not fit into host buffer')
+ raise
+
+
+def test_copy_from_host():
+ size = 1024
+ arr, buf = make_random_buffer(size=size, target='host')
+ lst = arr.tolist()
+ dbuf = global_context.new_buffer(size)
+
+ def put(*args, **kwargs):
+ dbuf.copy_from_host(buf, *args, **kwargs)
+ rbuf = dbuf.copy_to_host()
+ return np.frombuffer(rbuf, dtype=np.uint8).tolist()
+ assert put() == lst
+ assert put(position=size//4) == lst[:size//4]+lst[:-size//4]
+ assert put() == lst
+ assert put(position=1, nbytes=size//2) == \
+ lst[:1] + lst[:size//2] + lst[-(size-size//2-1):]
+
+ for (position, nbytes) in [
+ (size+2, -1), (-2, -1), (size+1, 0), (-3, 0),
+ ]:
+ with pytest.raises(ValueError):
+ try:
+ put(position=position, nbytes=nbytes)
+ print('dbuf.size={}, buf.size={}, position={}, nbytes={}'
+ .format(dbuf.size, buf.size, position, nbytes))
+ except Exception as e_info:
+ assert str(e_info).startswith(
+ 'position argument is out-of-range')
+ raise
+
+ for (position, nbytes) in [
+ (0, size+1),
+ ]:
+ with pytest.raises(ValueError):
+ try:
+ put(position=position, nbytes=nbytes)
+ print('dbuf.size={}, buf.size={}, position={}, nbytes={}'
+ .format(dbuf.size, buf.size, position, nbytes))
+ except Exception as e_info:
+ assert str(e_info).startswith(
+ 'requested more to copy than available from host buffer')
+ raise
+
+ for (position, nbytes) in [
+ (size//2, size//2+1)
+ ]:
+ with pytest.raises(ValueError):
+ try:
+ put(position=position, nbytes=nbytes)
+ print('dbuf.size={}, buf.size={}, position={}, nbytes={}'
+ .format(dbuf.size, buf.size, position, nbytes))
+ except Exception as e_info:
+ assert str(e_info).startswith(
+ 'requested more to copy than available in device buffer')
+ raise
+
+
+def test_BufferWriter():
+ def allocate(size):
+ cbuf = global_context.new_buffer(size)
+ writer = cuda.BufferWriter(cbuf)
+ return cbuf, writer
+
+ def test_writes(total_size, chunksize, buffer_size=0):
+ cbuf, writer = allocate(total_size)
+ arr, buf = make_random_buffer(size=total_size, target='host')
+
+ if buffer_size > 0:
+ writer.buffer_size = buffer_size
+
+ position = writer.tell()
+ assert position == 0
+ writer.write(buf.slice(length=chunksize))
+ assert writer.tell() == chunksize
+ writer.seek(0)
+ position = writer.tell()
+ assert position == 0
+
+ while position < total_size:
+ bytes_to_write = min(chunksize, total_size - position)
+ writer.write(buf.slice(offset=position, length=bytes_to_write))
+ position += bytes_to_write
+
+ writer.flush()
+ assert cbuf.size == total_size
+ buf2 = cbuf.copy_to_host()
+ assert buf2.size == total_size
+ arr2 = np.frombuffer(buf2, dtype=np.uint8)
+ np.testing.assert_equal(arr, arr2)
+
+ total_size, chunk_size = 1 << 16, 1000
+ test_writes(total_size, chunk_size)
+ test_writes(total_size, chunk_size, total_size // 16)
+
+ cbuf, writer = allocate(100)
+ writer.write(np.arange(100, dtype=np.uint8))
+ writer.writeat(50, np.arange(25, dtype=np.uint8))
+ writer.write(np.arange(25, dtype=np.uint8))
+ writer.flush()
+
+ arr = np.frombuffer(cbuf.copy_to_host(), np.uint8)
+ np.testing.assert_equal(arr[:50], np.arange(50, dtype=np.uint8))
+ np.testing.assert_equal(arr[50:75], np.arange(25, dtype=np.uint8))
+ np.testing.assert_equal(arr[75:], np.arange(25, dtype=np.uint8))
+
+
+def test_BufferWriter_edge_cases():
+ # edge cases, see cuda-test.cc for more information:
+ size = 1000
+ cbuf = global_context.new_buffer(size)
+ writer = cuda.BufferWriter(cbuf)
+ arr, buf = make_random_buffer(size=size, target='host')
+
+ assert writer.buffer_size == 0
+ writer.buffer_size = 100
+ assert writer.buffer_size == 100
+
+ writer.write(buf.slice(length=0))
+ assert writer.tell() == 0
+
+ writer.write(buf.slice(length=10))
+ writer.buffer_size = 200
+ assert writer.buffer_size == 200
+ assert writer.num_bytes_buffered == 0
+
+ writer.write(buf.slice(offset=10, length=300))
+ assert writer.num_bytes_buffered == 0
+
+ writer.write(buf.slice(offset=310, length=200))
+ assert writer.num_bytes_buffered == 0
+
+ writer.write(buf.slice(offset=510, length=390))
+ writer.write(buf.slice(offset=900, length=100))
+
+ writer.flush()
+
+ buf2 = cbuf.copy_to_host()
+ assert buf2.size == size
+ arr2 = np.frombuffer(buf2, dtype=np.uint8)
+ np.testing.assert_equal(arr, arr2)
+
+
+def test_BufferReader():
+ size = 1000
+ arr, cbuf = make_random_buffer(size=size, target='device')
+
+ reader = cuda.BufferReader(cbuf)
+ reader.seek(950)
+ assert reader.tell() == 950
+
+ data = reader.read(100)
+ assert len(data) == 50
+ assert reader.tell() == 1000
+
+ reader.seek(925)
+ arr2 = np.zeros(100, dtype=np.uint8)
+ n = reader.readinto(arr2)
+ assert n == 75
+ assert reader.tell() == 1000
+ np.testing.assert_equal(arr[925:], arr2[:75])
+
+ reader.seek(0)
+ assert reader.tell() == 0
+ buf2 = reader.read_buffer()
+ arr2 = np.frombuffer(buf2.copy_to_host(), dtype=np.uint8)
+ np.testing.assert_equal(arr, arr2)
+
+
+def make_recordbatch(length):
+ schema = pa.schema([pa.field('f0', pa.int16()),
+ pa.field('f1', pa.int16())])
+ a0 = pa.array(np.random.randint(0, 255, size=length, dtype=np.int16))
+ a1 = pa.array(np.random.randint(0, 255, size=length, dtype=np.int16))
+ batch = pa.RecordBatch.from_arrays([a0, a1], schema)
+ return batch
+
+
+def test_batch_serialize():
+ batch = make_recordbatch(10)
+ hbuf = batch.serialize()
+ cbuf = cuda.serialize_record_batch(batch, global_context)
+ # test that read_record_batch works properly:
+ cuda.read_record_batch(batch.schema, cbuf)
+ buf = cbuf.copy_to_host()
+ assert hbuf.equals(buf)
+ batch2 = pa.read_record_batch(buf, batch.schema)
+ assert hbuf.equals(batch2.serialize())
+ assert batch.num_columns == batch2.num_columns
+ assert batch.num_rows == batch2.num_rows
+ assert batch.column(0).equals(batch2.column(0))
+ assert batch.equals(batch2)
+
+
+def other_process_for_test_IPC(handle_buffer, expected_arr):
+ other_context = pa.cuda.Context(0)
+ ipc_handle = pa.cuda.IpcMemHandle.from_buffer(handle_buffer)
+ ipc_buf = other_context.open_ipc_buffer(ipc_handle)
+ buf = ipc_buf.copy_to_host()
+ assert buf.size == expected_arr.size, repr((buf.size, expected_arr.size))
+ arr = np.frombuffer(buf, dtype=expected_arr.dtype)
+ np.testing.assert_equal(arr, expected_arr)
+
+
+@cuda_ipc
+def test_IPC():
+ import multiprocessing
+ ctx = multiprocessing.get_context('spawn')
+ size = 1000
+ arr, cbuf = make_random_buffer(size=size, target='device')
+ ipc_handle = cbuf.export_for_ipc()
+ handle_buffer = ipc_handle.serialize()
+ p = ctx.Process(target=other_process_for_test_IPC,
+ args=(handle_buffer, arr))
+ p.start()
+ p.join()
+ assert p.exitcode == 0
diff --git a/python/setup.py b/python/setup.py
index a903998..3595d3e 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -98,6 +98,7 @@ class build_ext(_build_ext):
'build type (debug or release), default release'),
('boost-namespace=', None,
'namespace of boost (default: boost)'),
+ ('with-cuda', None, 'build the Cuda extension'),
('with-parquet', None, 'build the Parquet extension'),
('with-static-parquet', None, 'link parquet statically'),
('with-static-boost', None, 'link boost statically'),
@@ -132,6 +133,8 @@ class build_ext(_build_ext):
if not hasattr(sys, 'gettotalrefcount'):
self.build_type = 'release'
+ self.with_cuda = strtobool(
+ os.environ.get('PYARROW_WITH_CUDA', '0'))
self.with_parquet = strtobool(
os.environ.get('PYARROW_WITH_PARQUET', '0'))
self.with_static_parquet = strtobool(
@@ -153,6 +156,7 @@ class build_ext(_build_ext):
CYTHON_MODULE_NAMES = [
'lib',
+ '_cuda',
'_parquet',
'_orc',
'_plasma']
@@ -189,6 +193,8 @@ class build_ext(_build_ext):
if self.cmake_generator:
cmake_options += ['-G', self.cmake_generator]
+ if self.with_cuda:
+ cmake_options.append('-DPYARROW_BUILD_CUDA=on')
if self.with_parquet:
cmake_options.append('-DPYARROW_BUILD_PARQUET=on')
if self.with_static_parquet:
@@ -248,7 +254,7 @@ class build_ext(_build_ext):
'-j{0}'.format(os.environ['PYARROW_PARALLEL']))
# Generate the build files
- print("-- Runnning cmake for pyarrow")
+ print("-- Running cmake for pyarrow")
self.spawn(['cmake'] + extra_cmake_args + cmake_options + [source])
print("-- Finished cmake for pyarrow")
@@ -277,6 +283,8 @@ class build_ext(_build_ext):
print(pjoin(build_lib, 'pyarrow'))
move_shared_libs(build_prefix, build_lib, "arrow")
move_shared_libs(build_prefix, build_lib, "arrow_python")
+ if self.with_cuda:
+ move_shared_libs(build_prefix, build_lib, "arrow_gpu")
if self.with_plasma:
move_shared_libs(build_prefix, build_lib, "plasma")
if self.with_parquet and not self.with_static_parquet:
@@ -362,6 +370,8 @@ class build_ext(_build_ext):
return True
if name == '_orc' and not self.with_orc:
return True
+ if name == '_cuda' and not self.with_cuda:
+ return True
return False
def _get_build_dir(self):