Repository: arrow Updated Branches: refs/heads/master e881f1155 -> 5b35d6bda
ARROW-457: Python: Better control over memory pool Author: Uwe L. Korn <uw...@xhochy.com> Closes #315 from xhochy/ARROW-457 and squashes the following commits: dc5abdb [Uwe L. Korn] Use aligned deallocator 20c8505 [Uwe L. Korn] ARROW-458: Python: Expose jemalloc MemoryPool 2962bd8 [Uwe L. Korn] ARROW-457: Python: Better control over memory pool Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5b35d6bd Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5b35d6bd Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5b35d6bd Branch: refs/heads/master Commit: 5b35d6bda94e901d25aaf3d622dbe47214f75488 Parents: e881f11 Author: Uwe L. Korn <uw...@xhochy.com> Authored: Sat Feb 4 16:23:46 2017 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Sat Feb 4 16:23:46 2017 -0500 ---------------------------------------------------------------------- ci/travis_script_python.sh | 3 +- cpp/src/arrow/jemalloc/memory_pool.cc | 2 +- python/CMakeLists.txt | 15 ++++++ python/cmake_modules/FindArrow.cmake | 14 +++++ python/pyarrow/__init__.py | 3 +- python/pyarrow/_parquet.pxd | 8 +-- python/pyarrow/_parquet.pyx | 13 +++-- python/pyarrow/array.pyx | 32 ++++++------ python/pyarrow/includes/libarrow.pxd | 6 +-- python/pyarrow/includes/libarrow_io.pxd | 2 +- python/pyarrow/includes/libarrow_ipc.pxd | 3 +- python/pyarrow/includes/libarrow_jemalloc.pxd | 27 ++++++++++ python/pyarrow/includes/pyarrow.pxd | 9 ++-- python/pyarrow/io.pyx | 18 +++---- python/pyarrow/jemalloc.pyx | 28 ++++++++++ python/pyarrow/memory.pxd | 27 ++++++++++ python/pyarrow/memory.pyx | 49 +++++++++++++++++ python/pyarrow/tests/test_jemalloc.py | 56 ++++++++++++++++++++ python/setup.py | 11 +++- python/src/pyarrow/adapters/builtin.cc | 6 ++- python/src/pyarrow/adapters/builtin.h | 3 +- python/src/pyarrow/common.cc | 61 +++++----------------- python/src/pyarrow/common.h | 1 + 23 files changed, 298 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/ci/travis_script_python.sh ---------------------------------------------------------------------- diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh index c186fd4..11d8d89 100755 --- a/ci/travis_script_python.sh +++ b/ci/travis_script_python.sh @@ -85,9 +85,10 @@ python_version_tests() { # Other stuff pip install pip install -r requirements.txt - python setup.py build_ext --inplace --with-parquet + python setup.py build_ext --inplace --with-parquet --with-jemalloc python -c "import pyarrow.parquet" + python -c "import pyarrow.jemalloc" python -m pytest -vv -r sxX pyarrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/cpp/src/arrow/jemalloc/memory_pool.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/jemalloc/memory_pool.cc b/cpp/src/arrow/jemalloc/memory_pool.cc index c568316..f7a1446 100644 --- a/cpp/src/arrow/jemalloc/memory_pool.cc +++ b/cpp/src/arrow/jemalloc/memory_pool.cc @@ -65,7 +65,7 @@ Status MemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) void MemoryPool::Free(uint8_t* buffer, int64_t size) { allocated_size_ -= size; - free(buffer); + dallocx(buffer, MALLOCX_ALIGN(kAlignment)); } int64_t MemoryPool::bytes_allocated() const { http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 942e74b..898c48e 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -53,6 +53,9 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") option(PYARROW_BUILD_PARQUET "Build the PyArrow Parquet integration" OFF) + option(PYARROW_BUILD_JEMALLOC + "Build the PyArrow jemalloc integration" + OFF) endif() if(NOT PYARROW_BUILD_TESTS) @@ -412,6 +415,7 @@ set(CYTHON_EXTENSIONS config error io + memory scalar schema table @@ -446,6 +450,17 @@ if (PYARROW_BUILD_PARQUET) _parquet) endif() +if (PYARROW_BUILD_JEMALLOC) + ADD_THIRDPARTY_LIB(arrow_jemalloc + SHARED_LIB ${ARROW_JEMALLOC_SHARED_LIB}) + set(LINK_LIBS + ${LINK_LIBS} + arrow_jemalloc) + set(CYTHON_EXTENSIONS + ${CYTHON_EXTENSIONS} + jemalloc) +endif() + add_library(pyarrow SHARED ${PYARROW_SRCS}) target_link_libraries(pyarrow ${LINK_LIBS}) http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/cmake_modules/FindArrow.cmake ---------------------------------------------------------------------- diff --git a/python/cmake_modules/FindArrow.cmake b/python/cmake_modules/FindArrow.cmake index 3c359aa..5d0207d 100644 --- a/python/cmake_modules/FindArrow.cmake +++ b/python/cmake_modules/FindArrow.cmake @@ -52,11 +52,17 @@ find_library(ARROW_IPC_LIB_PATH NAMES arrow_ipc ${ARROW_SEARCH_LIB_PATH} NO_DEFAULT_PATH) +find_library(ARROW_JEMALLOC_LIB_PATH NAMES arrow_jemalloc + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) + if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH) set(ARROW_FOUND TRUE) set(ARROW_LIB_NAME libarrow) set(ARROW_IO_LIB_NAME libarrow_io) set(ARROW_IPC_LIB_NAME libarrow_ipc) + set(ARROW_JEMALLOC_LIB_NAME libarrow_jemalloc) set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH}) set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a) @@ -68,10 +74,14 @@ if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH) set(ARROW_IPC_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IPC_LIB_NAME}.a) set(ARROW_IPC_SHARED_LIB ${ARROW_LIBS}/${ARROW_IPC_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + set(ARROW_JEMALLOC_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_JEMALLOC_LIB_NAME}.a) + set(ARROW_JEMALLOC_SHARED_LIB ${ARROW_LIBS}/${ARROW_JEMALLOC_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + if (NOT Arrow_FIND_QUIETLY) message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}") message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}") message(STATUS "Found the Arrow IPC library: ${ARROW_IPC_LIB_PATH}") + message(STATUS "Found the Arrow jemalloc library: ${ARROW_JEMALLOC_LIB_PATH}") endif () else () if (NOT Arrow_FIND_QUIETLY) @@ -94,4 +104,8 @@ mark_as_advanced( ARROW_SHARED_LIB ARROW_IO_STATIC_LIB ARROW_IO_SHARED_LIB + ARROW_IPC_STATIC_LIB + ARROW_IPC_SHARED_LIB + ARROW_JEMALLOC_STATIC_LIB + ARROW_JEMALLOC_SHARED_LIB ) http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 7c521db..ea4710d 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -30,7 +30,6 @@ from pyarrow.config import cpu_count, set_cpu_count from pyarrow.array import (Array, from_pandas_series, from_pylist, - total_allocated_bytes, NumericArray, IntegerArray, FloatingPointArray, BooleanArray, Int8Array, UInt8Array, @@ -48,6 +47,8 @@ from pyarrow.io import (HdfsFile, NativeFile, PythonFileInterface, from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter +from pyarrow.memory import MemoryPool, total_allocated_bytes + from pyarrow.scalar import (ArrayValue, Scalar, NA, NAType, BooleanValue, Int8Value, Int16Value, Int32Value, Int64Value, http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/_parquet.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 6b9350a..005be91 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -19,7 +19,7 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus, - CTable, MemoryPool) + CTable, CMemoryPool) from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream @@ -204,13 +204,13 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: CStatus OpenFile(const shared_ptr[ReadableFileInterface]& file, - MemoryPool* allocator, + CMemoryPool* allocator, const ReaderProperties& properties, const shared_ptr[CFileMetaData]& metadata, unique_ptr[FileReader]* reader) cdef cppclass FileReader: - FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader) + FileReader(CMemoryPool* pool, unique_ptr[ParquetFileReader] reader) CStatus ReadColumn(int i, shared_ptr[CArray]* out); CStatus ReadTable(shared_ptr[CTable]* out); CStatus ReadTable(const vector[int]& column_indices, @@ -229,7 +229,7 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil: cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil: cdef CStatus WriteTable( - const CTable* table, MemoryPool* pool, + const CTable* table, CMemoryPool* pool, const shared_ptr[OutputStream]& sink, int64_t chunk_size, const shared_ptr[WriterProperties]& properties) http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/_parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index fd4670a..796c436 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -32,6 +32,7 @@ from pyarrow.compat import tobytes, frombytes from pyarrow.error import ArrowException from pyarrow.error cimport check_status from pyarrow.io import NativeFile +from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool from pyarrow.table cimport Table from pyarrow.io cimport NativeFile, get_reader, get_writer @@ -342,13 +343,13 @@ cdef logical_type_name_from_enum(ParquetLogicalType type_): cdef class ParquetReader: cdef: object source - MemoryPool* allocator + CMemoryPool* allocator unique_ptr[FileReader] reader column_idx_map FileMetaData _metadata - def __cinit__(self): - self.allocator = default_memory_pool() + def __cinit__(self, MemoryPool memory_pool=None): + self.allocator = maybe_unbox_memory_pool(memory_pool) self._metadata = None def open(self, object source, FileMetaData metadata=None): @@ -471,6 +472,7 @@ cdef class ParquetWriter: cdef: shared_ptr[WriterProperties] properties shared_ptr[OutputStream] sink + CMemoryPool* allocator cdef readonly: object use_dictionary @@ -479,7 +481,7 @@ cdef class ParquetWriter: int row_group_size def __cinit__(self, where, use_dictionary=None, compression=None, - version=None): + version=None, MemoryPool memory_pool=None): cdef shared_ptr[FileOutputStream] filestream if isinstance(where, six.string_types): @@ -487,6 +489,7 @@ cdef class ParquetWriter: self.sink = <shared_ptr[OutputStream]> filestream else: get_writer(where, &self.sink) + self.allocator = maybe_unbox_memory_pool(memory_pool) self.use_dictionary = use_dictionary self.compression = compression @@ -540,6 +543,6 @@ cdef class ParquetWriter: cdef int c_row_group_size = row_group_size with nogil: - check_status(WriteTable(ctable, default_memory_pool(), + check_status(WriteTable(ctable, self.allocator, self.sink, c_row_group_size, self.properties)) http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/array.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx index c3a5a04..9b34f56 100644 --- a/python/pyarrow/array.pyx +++ b/python/pyarrow/array.pyx @@ -29,6 +29,7 @@ import pyarrow.config from pyarrow.compat import frombytes, tobytes from pyarrow.error cimport check_status +from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool cimport pyarrow.scalar as scalar from pyarrow.scalar import NA @@ -44,11 +45,6 @@ cdef _pandas(): return pd -def total_allocated_bytes(): - cdef MemoryPool* pool = pyarrow.get_memory_pool() - return pool.bytes_allocated() - - cdef class Array: cdef init(self, const shared_ptr[CArray]& sp_array): @@ -58,7 +54,7 @@ cdef class Array: self.type.init(self.sp_array.get().type()) @staticmethod - def from_pandas(obj, mask=None, timestamps_to_ms=False, Field field=None): + def from_pandas(obj, mask=None, timestamps_to_ms=False, Field field=None, MemoryPool memory_pool=None): """ Convert pandas.Series to an Arrow Array. @@ -74,6 +70,9 @@ cdef class Array: compatibility with other functionality like Parquet I/O which only supports milliseconds. + memory_pool: MemoryPool, optional + Specific memory pool to use to allocate the resulting Arrow array. + Notes ----- Localized timestamps will currently be returned as UTC (pandas's native representation). @@ -107,6 +106,7 @@ cdef class Array: cdef: shared_ptr[CArray] out shared_ptr[CField] c_field + CMemoryPool* pool pd = _pandas() @@ -121,20 +121,20 @@ cdef class Array: if isinstance(series_values, pd.Categorical): return DictionaryArray.from_arrays(series_values.codes, series_values.categories.values, - mask=mask) + mask=mask, memory_pool=memory_pool) else: if series_values.dtype.type == np.datetime64 and timestamps_to_ms: series_values = series_values.astype('datetime64[ms]') + pool = maybe_unbox_memory_pool(memory_pool) with nogil: check_status(pyarrow.PandasToArrow( - pyarrow.get_memory_pool(), series_values, mask, - c_field, &out)) + pool, series_values, mask, c_field, &out)) return box_arrow_array(out) @staticmethod - def from_list(object list_obj, DataType type=None): + def from_list(object list_obj, DataType type=None, MemoryPool memory_pool=None): """ Convert Python list to Arrow array @@ -147,10 +147,12 @@ cdef class Array: pyarrow.array.Array """ cdef: - shared_ptr[CArray] sp_array + shared_ptr[CArray] sp_array + CMemoryPool* pool + pool = maybe_unbox_memory_pool(memory_pool) if type is None: - check_status(pyarrow.ConvertPySequence(list_obj, &sp_array)) + check_status(pyarrow.ConvertPySequence(list_obj, pool, &sp_array)) else: raise NotImplementedError() @@ -330,7 +332,7 @@ cdef class BinaryArray(Array): cdef class DictionaryArray(Array): @staticmethod - def from_arrays(indices, dictionary, mask=None): + def from_arrays(indices, dictionary, mask=None, MemoryPool memory_pool=None): """ Construct Arrow DictionaryArray from array of indices (must be non-negative integers) and corresponding array of dictionary values @@ -352,8 +354,8 @@ cdef class DictionaryArray(Array): shared_ptr[CDataType] c_type shared_ptr[CArray] c_result - arrow_indices = Array.from_pandas(indices, mask=mask) - arrow_dictionary = Array.from_pandas(dictionary) + arrow_indices = Array.from_pandas(indices, mask=mask, memory_pool=memory_pool) + arrow_dictionary = Array.from_pandas(dictionary, memory_pool=memory_pool) if not isinstance(arrow_indices, IntegerArray): raise ValueError('Indices must be integer type') http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 6284ad3..38883e8 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -90,7 +90,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CDataType] timestamp(TimeUnit unit) - cdef cppclass MemoryPool" arrow::MemoryPool": + cdef cppclass CMemoryPool" arrow::MemoryPool": int64_t bytes_allocated() cdef cppclass CBuffer" arrow::Buffer": @@ -104,9 +104,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: cdef cppclass PoolBuffer(ResizableBuffer): PoolBuffer() - PoolBuffer(MemoryPool*) + PoolBuffer(CMemoryPool*) - cdef MemoryPool* default_memory_pool() + cdef CMemoryPool* default_memory_pool() cdef cppclass CListType" arrow::ListType"(CDataType): CListType(const shared_ptr[CDataType]& value_type) http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/includes/libarrow_io.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd index 3137938..8d0d524 100644 --- a/python/pyarrow/includes/libarrow_io.pxd +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -82,7 +82,7 @@ cdef extern from "arrow/io/file.h" namespace "arrow::io" nogil: CStatus Open(const c_string& path, shared_ptr[ReadableFile]* file) @staticmethod - CStatus Open(const c_string& path, MemoryPool* memory_pool, + CStatus Open(const c_string& path, CMemoryPool* memory_pool, shared_ptr[ReadableFile]* file) int file_descriptor() http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/includes/libarrow_ipc.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd index bfece14..5ab9815 100644 --- a/python/pyarrow/includes/libarrow_ipc.pxd +++ b/python/pyarrow/includes/libarrow_ipc.pxd @@ -18,8 +18,7 @@ # distutils: language = c++ from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport (MemoryPool, CArray, CSchema, - CRecordBatch) +from pyarrow.includes.libarrow cimport (CArray, CSchema, CRecordBatch) from pyarrow.includes.libarrow_io cimport (InputStream, OutputStream, ReadableFileInterface) http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/includes/libarrow_jemalloc.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_jemalloc.pxd b/python/pyarrow/includes/libarrow_jemalloc.pxd new file mode 100644 index 0000000..0609d19 --- /dev/null +++ b/python/pyarrow/includes/libarrow_jemalloc.pxd @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * + +cdef extern from "arrow/jemalloc/memory_pool.h" namespace "arrow::jemalloc" nogil: + cdef cppclass CJemallocMemoryPool" arrow::jemalloc::MemoryPool": + int64_t bytes_allocated() + @staticmethod + CMemoryPool* default_pool() http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/includes/pyarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd index 04ad4f3..f1d45e0 100644 --- a/python/pyarrow/includes/pyarrow.pxd +++ b/python/pyarrow/includes/pyarrow.pxd @@ -20,7 +20,7 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport (CArray, CBuffer, CColumn, CField, CTable, CDataType, CStatus, Type, - MemoryPool, TimeUnit) + CMemoryPool, TimeUnit) cimport pyarrow.includes.libarrow_io as arrow_io @@ -28,9 +28,9 @@ cimport pyarrow.includes.libarrow_io as arrow_io cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil: shared_ptr[CDataType] GetPrimitiveType(Type type) shared_ptr[CDataType] GetTimestampType(TimeUnit unit) - CStatus ConvertPySequence(object obj, shared_ptr[CArray]* out) + CStatus ConvertPySequence(object obj, CMemoryPool* pool, shared_ptr[CArray]* out) - CStatus PandasToArrow(MemoryPool* pool, object ao, object mo, + CStatus PandasToArrow(CMemoryPool* pool, object ao, object mo, shared_ptr[CField] field, shared_ptr[CArray]* out) @@ -43,7 +43,8 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil: CStatus ConvertTableToPandas(const shared_ptr[CTable]& table, int nthreads, PyObject** out) - MemoryPool* get_memory_pool() + void set_default_memory_pool(CMemoryPool* pool) + CMemoryPool* get_memory_pool() cdef extern from "pyarrow/common.h" namespace "pyarrow" nogil: http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/io.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 8b56508..89ce6e7 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -33,6 +33,7 @@ cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.compat import frombytes, tobytes, encode_file_path from pyarrow.error cimport check_status +from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool from pyarrow.schema cimport Schema from pyarrow.table cimport (RecordBatch, batch_from_cbatch, table_from_ctable) @@ -372,7 +373,7 @@ cdef class OSFile(NativeFile): cdef: object path - def __cinit__(self, path, mode='r'): + def __cinit__(self, path, mode='r', MemoryPool memory_pool=None): self.path = path cdef: @@ -383,7 +384,7 @@ cdef class OSFile(NativeFile): self.is_readable = self.is_writeable = 0 if mode in ('r', 'rb'): - self._open_readable(c_path) + self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool)) elif mode in ('w', 'wb'): self._open_writeable(c_path) else: @@ -391,12 +392,11 @@ cdef class OSFile(NativeFile): self.is_open = True - cdef _open_readable(self, c_string path): + cdef _open_readable(self, c_string path, CMemoryPool* pool): cdef shared_ptr[ReadableFile] handle with nogil: - check_status(ReadableFile.Open(path, pyarrow.get_memory_pool(), - &handle)) + check_status(ReadableFile.Open(path, pool, &handle)) self.is_readable = 1 self.rd_file = <shared_ptr[ReadableFileInterface]> handle @@ -450,9 +450,9 @@ cdef class Buffer: self.buffer.get().size()) -cdef shared_ptr[PoolBuffer] allocate_buffer(): +cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool): cdef shared_ptr[PoolBuffer] result - result.reset(new PoolBuffer(pyarrow.get_memory_pool())) + result.reset(new PoolBuffer(pool)) return result @@ -461,8 +461,8 @@ cdef class InMemoryOutputStream(NativeFile): cdef: shared_ptr[PoolBuffer] buffer - def __cinit__(self): - self.buffer = allocate_buffer() + def __cinit__(self, MemoryPool memory_pool=None): + self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool)) self.wr_file.reset(new BufferOutputStream( <shared_ptr[ResizableBuffer]> self.buffer)) self.is_readable = 0 http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/jemalloc.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/jemalloc.pyx b/python/pyarrow/jemalloc.pyx new file mode 100644 index 0000000..97583f4 --- /dev/null +++ b/python/pyarrow/jemalloc.pyx @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from pyarrow.includes.libarrow_jemalloc cimport CJemallocMemoryPool +from pyarrow.memory cimport MemoryPool + +def default_pool(): + cdef MemoryPool pool = MemoryPool() + pool.init(CJemallocMemoryPool.default_pool()) + return pool http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/memory.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/memory.pxd b/python/pyarrow/memory.pxd new file mode 100644 index 0000000..3079ccb --- /dev/null +++ b/python/pyarrow/memory.pxd @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyarrow.includes.libarrow cimport CMemoryPool + + +cdef class MemoryPool: + cdef: + CMemoryPool* pool + + cdef init(self, CMemoryPool* pool) + +cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool) http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/memory.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/memory.pyx b/python/pyarrow/memory.pyx new file mode 100644 index 0000000..18a6de4 --- /dev/null +++ b/python/pyarrow/memory.pyx @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from pyarrow.includes.libarrow cimport CMemoryPool +from pyarrow.includes.pyarrow cimport set_default_memory_pool, get_memory_pool + +cdef class MemoryPool: + cdef init(self, CMemoryPool* pool): + self.pool = pool + + def bytes_allocated(self): + return self.pool.bytes_allocated() + +cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool): + if memory_pool is None: + return get_memory_pool() + else: + return memory_pool.pool + +def default_pool(): + cdef: + MemoryPool pool = MemoryPool() + pool.init(get_memory_pool()) + return pool + +def set_default_pool(MemoryPool pool): + set_default_memory_pool(pool.pool) + +def total_allocated_bytes(): + cdef CMemoryPool* pool = get_memory_pool() + return pool.bytes_allocated() http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/pyarrow/tests/test_jemalloc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_jemalloc.py b/python/pyarrow/tests/test_jemalloc.py new file mode 100644 index 0000000..8efd514 --- /dev/null +++ b/python/pyarrow/tests/test_jemalloc.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import gc +import pytest + +try: + import pyarrow.jemalloc + HAVE_JEMALLOC = True +except ImportError: + HAVE_JEMALLOC = False + +jemalloc = pytest.mark.skipif(not HAVE_JEMALLOC, + reason='jemalloc support not built') + + +@jemalloc +def test_different_memory_pool(): + gc.collect() + bytes_before_default = pyarrow.total_allocated_bytes() + bytes_before_jemalloc = pyarrow.jemalloc.default_pool().bytes_allocated() + array = pyarrow.from_pylist([1, None, 3, None], + memory_pool=pyarrow.jemalloc.default_pool()) + gc.collect() + assert pyarrow.total_allocated_bytes() == bytes_before_default + assert pyarrow.jemalloc.default_pool().bytes_allocated() > bytes_before_jemalloc + +@jemalloc +def test_default_memory_pool(): + gc.collect() + bytes_before_default = pyarrow.total_allocated_bytes() + bytes_before_jemalloc = pyarrow.jemalloc.default_pool().bytes_allocated() + + old_memory_pool = pyarrow.memory.default_pool() + pyarrow.memory.set_default_pool(pyarrow.jemalloc.default_pool()) + array = pyarrow.from_pylist([1, None, 3, None]) + pyarrow.memory.set_default_pool(old_memory_pool) + gc.collect() + + assert pyarrow.total_allocated_bytes() == bytes_before_default + assert pyarrow.jemalloc.default_pool().bytes_allocated() > bytes_before_jemalloc + http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index a771d23..5f5e5f3 100644 --- a/python/setup.py +++ b/python/setup.py @@ -80,7 +80,8 @@ class build_ext(_build_ext): description = "Build the C-extensions for arrow" user_options = ([('extra-cmake-args=', None, 'extra arguments for CMake'), ('build-type=', None, 'build type (debug or release)'), - ('with-parquet', None, 'build the Parquet extension')] + + ('with-parquet', None, 'build the Parquet extension'), + ('with-jemalloc', None, 'build the jemalloc extension')] + _build_ext.user_options) def initialize_options(self): @@ -88,12 +89,15 @@ class build_ext(_build_ext): self.extra_cmake_args = os.environ.get('PYARROW_CMAKE_OPTIONS', '') self.build_type = os.environ.get('PYARROW_BUILD_TYPE', 'debug').lower() self.with_parquet = False + self.with_jemalloc = False CYTHON_MODULE_NAMES = [ 'array', 'config', 'error', 'io', + 'jemalloc', + 'memory', '_parquet', 'scalar', 'schema', @@ -135,6 +139,9 @@ class build_ext(_build_ext): if self.with_parquet: cmake_options.append('-DPYARROW_BUILD_PARQUET=on') + if self.with_jemalloc: + cmake_options.append('-DPYARROW_BUILD_JEMALLOC=on') + if sys.platform != 'win32': cmake_options.append('-DCMAKE_BUILD_TYPE={0}' .format(self.build_type)) @@ -216,6 +223,8 @@ class build_ext(_build_ext): def _failure_permitted(self, name): if name == '_parquet' and not self.with_parquet: return True + if name == 'jemalloc' and not self.with_jemalloc: + return True return False def _get_inplace_dir(self): http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/src/pyarrow/adapters/builtin.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/builtin.cc b/python/src/pyarrow/adapters/builtin.cc index fb7475f..1abfb40 100644 --- a/python/src/pyarrow/adapters/builtin.cc +++ b/python/src/pyarrow/adapters/builtin.cc @@ -29,6 +29,7 @@ using arrow::ArrayBuilder; using arrow::DataType; +using arrow::MemoryPool; using arrow::Status; using arrow::Type; @@ -495,7 +496,8 @@ Status ListConverter::Init(const std::shared_ptr<ArrayBuilder>& builder) { return Status::OK(); } -Status ConvertPySequence(PyObject* obj, std::shared_ptr<arrow::Array>* out) { +Status ConvertPySequence( + PyObject* obj, MemoryPool* pool, std::shared_ptr<arrow::Array>* out) { std::shared_ptr<DataType> type; int64_t size; PyDateTime_IMPORT; @@ -516,7 +518,7 @@ Status ConvertPySequence(PyObject* obj, std::shared_ptr<arrow::Array>* out) { // Give the sequence converter an array builder std::shared_ptr<ArrayBuilder> builder; - RETURN_NOT_OK(arrow::MakeBuilder(get_memory_pool(), type, &builder)); + RETURN_NOT_OK(arrow::MakeBuilder(pool, type, &builder)); converter->Init(builder); RETURN_NOT_OK(converter->AppendData(obj)); http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/src/pyarrow/adapters/builtin.h ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/builtin.h b/python/src/pyarrow/adapters/builtin.h index 1ff3694..667298e 100644 --- a/python/src/pyarrow/adapters/builtin.h +++ b/python/src/pyarrow/adapters/builtin.h @@ -38,7 +38,8 @@ class Status; namespace pyarrow { PYARROW_EXPORT -arrow::Status ConvertPySequence(PyObject* obj, std::shared_ptr<arrow::Array>* out); +arrow::Status ConvertPySequence( + PyObject* obj, arrow::MemoryPool* pool, std::shared_ptr<arrow::Array>* out); } // namespace pyarrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/src/pyarrow/common.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/common.cc b/python/src/pyarrow/common.cc index b8712d7..d2f5291 100644 --- a/python/src/pyarrow/common.cc +++ b/python/src/pyarrow/common.cc @@ -28,58 +28,21 @@ using arrow::Status; namespace pyarrow { -class PyArrowMemoryPool : public arrow::MemoryPool { - public: - PyArrowMemoryPool() : bytes_allocated_(0) {} - virtual ~PyArrowMemoryPool() {} +static std::mutex memory_pool_mutex; +static arrow::MemoryPool* default_pyarrow_pool = nullptr; - Status Allocate(int64_t size, uint8_t** out) override { - std::lock_guard<std::mutex> guard(pool_lock_); - *out = static_cast<uint8_t*>(std::malloc(size)); - if (*out == nullptr) { - std::stringstream ss; - ss << "malloc of size " << size << " failed"; - return Status::OutOfMemory(ss.str()); - } - - bytes_allocated_ += size; - - return Status::OK(); - } - - Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override { - *ptr = reinterpret_cast<uint8_t*>(std::realloc(*ptr, new_size)); - - if (*ptr == NULL) { - std::stringstream ss; - ss << "realloc of size " << new_size << " failed"; - return Status::OutOfMemory(ss.str()); - } - - bytes_allocated_ += new_size - old_size; - - return Status::OK(); - } - - int64_t bytes_allocated() const override { - std::lock_guard<std::mutex> guard(pool_lock_); - return bytes_allocated_; - } - - void Free(uint8_t* buffer, int64_t size) override { - std::lock_guard<std::mutex> guard(pool_lock_); - std::free(buffer); - bytes_allocated_ -= size; - } - - private: - mutable std::mutex pool_lock_; - int64_t bytes_allocated_; -}; +void set_default_memory_pool(arrow::MemoryPool* pool) { + std::lock_guard<std::mutex> guard(memory_pool_mutex); + default_pyarrow_pool = pool; +} arrow::MemoryPool* get_memory_pool() { - static PyArrowMemoryPool memory_pool; - return &memory_pool; + std::lock_guard<std::mutex> guard(memory_pool_mutex); + if (default_pyarrow_pool) { + return default_pyarrow_pool; + } else { + return arrow::default_memory_pool(); + } } // ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/5b35d6bd/python/src/pyarrow/common.h ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h index 0733a3b..ad65ec7 100644 --- a/python/src/pyarrow/common.h +++ b/python/src/pyarrow/common.h @@ -98,6 +98,7 @@ struct PyObjectStringify { } // Return the common PyArrow memory pool +PYARROW_EXPORT void set_default_memory_pool(arrow::MemoryPool* pool); PYARROW_EXPORT arrow::MemoryPool* get_memory_pool(); class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer {