This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new e82a34a ARROW-530: [C++/Python] Provide subpools for better memory
allocation …
e82a34a is described below
commit e82a34a3cb30a92a5ef00cca74d054e84cee7a6c
Author: Rok <[email protected]>
AuthorDate: Thu Jun 7 21:42:12 2018 +0200
ARROW-530: [C++/Python] Provide subpools for better memory allocation …
…tracking
Currently we can only track the amount of bytes allocated by the main
memory pool or the alternative jemalloc implementation. To better understand
certain situation, we should provide a MemoryPool proxy implementation that
tracks only the amount of memory that was made through its direct calls but
delegates the actual allocation to an underlying pool.
Authors: Rok Mihevc <[email protected]> and Alex Hagerman
<[email protected]>
Usage example:
---------------------
import pyarrow as pa
def report(pool, proxy_pool):
print("Total: ", pa.total_allocated_bytes())
print("Default pool: ", pool.bytes_allocated())
print("Proxy allocated: ", proxy_pool.proxy_bytes_allocated())
pool = pa.default_memory_pool()
proxy_pool = pa.ProxyMemoryPool(pool)
report(pool, proxy_pool)
a1 = pa.array([0]*1000, memory_pool=pool)
report(pool, proxy_pool)
a2 = pa.array([0]*1, memory_pool=proxy_pool)
report(pool, proxy_pool)
a3 = pa.array([0]*1000, memory_pool=proxy_pool)
report(pool, proxy_pool)
a3 = pa.array([0]*1000, memory_pool=proxy_pool)
report(pool, proxy_pool)
Result:
---------------------
Total: 0
Default pool: 0
bytes_allocated: 0
Proxy allocated: 0
Total: 8128
Default pool: 8128
bytes_allocated: 0
Proxy allocated: 0
Allocate: size = 64
Allocate: size = 256
Reallocate: old_size = 256 - new_size = 64 - proxy_allocated - 128
Total: 8256
Default pool: 8256
bytes_allocated: 128
Proxy allocated: 128
Allocate: size = 128
Allocate: size = 8192
Reallocate: old_size = 8192 - new_size = 8000 - proxy_allocated - 8256
Total: 16384
Default pool: 16384
bytes_allocated: 8256
Proxy allocated: 8256
Allocate: size = 128
Allocate: size = 8192
Reallocate: old_size = 8192 - new_size = 8000 - proxy_allocated - 16384
Free: size = 128
Free: size = 8000
Total: 16384
Default pool: 16384
bytes_allocated: 8256
Proxy allocated: 8256
Author: Rok <[email protected]>
Closes #2057 from rok/master and squashes the following commits:
eabd8a4f <Rok> changing ProxyMemoryPool instantiation
96e58ee6 <Rok> fixing linting issue
5bebf97d <Rok> adding docstring
195a1367 <Rok> implementing feedback
0645e500 <Rok> refactoring ProxyMemoryPool interface
17a5c9b6 <Rok> ARROW-530 - C++ linter fix
f90b8970 <Rok> C++ unit tests for ARROW-530 passing
7c4b7692 <Rok> adding c++ unit tests
62dcdb0a <Rok> ARROW-530: C++/Python: Provide subpools for better memory
allocation tracking
---
cpp/src/arrow/memory_pool-test.cc | 21 +++++++++++++++++++++
cpp/src/arrow/memory_pool.cc | 36 ++++++++++++++++++++++++++++++++++++
cpp/src/arrow/memory_pool.h | 25 +++++++++++++++++++++++++
python/pyarrow/__init__.py | 2 +-
python/pyarrow/includes/libarrow.pxd | 3 +++
python/pyarrow/memory.pxi | 13 +++++++++++++
6 files changed, 99 insertions(+), 1 deletion(-)
diff --git a/cpp/src/arrow/memory_pool-test.cc
b/cpp/src/arrow/memory_pool-test.cc
index c5e3ef2..8915708 100644
--- a/cpp/src/arrow/memory_pool-test.cc
+++ b/cpp/src/arrow/memory_pool-test.cc
@@ -91,4 +91,25 @@ TEST(LoggingMemoryPool, Logging) {
ASSERT_EQ(200, pool->max_memory());
}
+
+TEST(ProxyMemoryPool, Logging) {
+ MemoryPool* pool = default_memory_pool();
+
+ ProxyMemoryPool pp(pool);
+
+ uint8_t* data;
+ ASSERT_OK(pool->Allocate(100, &data));
+
+ uint8_t* data2;
+ ASSERT_OK(pp.Allocate(300, &data2));
+
+ ASSERT_EQ(400, pool->bytes_allocated());
+ ASSERT_EQ(300, pp.bytes_allocated());
+
+ pool->Free(data, 100);
+ pp.Free(data2, 300);
+
+ ASSERT_EQ(0, pool->bytes_allocated());
+ ASSERT_EQ(0, pp.bytes_allocated());
+}
} // namespace arrow
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index dedab7e..34bd600 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -201,4 +201,40 @@ int64_t LoggingMemoryPool::max_memory() const {
std::cout << "max_memory: " << mem << std::endl;
return mem;
}
+
+ProxyMemoryPool::ProxyMemoryPool(MemoryPool* pool) : pool_(pool) {}
+
+Status ProxyMemoryPool::Allocate(int64_t size, uint8_t** out) {
+ RETURN_NOT_OK(pool_->Allocate(size, out));
+ bytes_allocated_ += size;
+ {
+ std::lock_guard<std::mutex> guard(lock_);
+ if (bytes_allocated_ > max_memory_) {
+ max_memory_ = bytes_allocated_.load();
+ }
+ }
+ return Status::OK();
+}
+
+Status ProxyMemoryPool::Reallocate(int64_t old_size, int64_t new_size,
uint8_t** ptr) {
+ RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr));
+ bytes_allocated_ += new_size - old_size;
+ {
+ std::lock_guard<std::mutex> guard(lock_);
+ if (bytes_allocated_ > max_memory_) {
+ max_memory_ = bytes_allocated_.load();
+ }
+ }
+ return Status::OK();
+}
+
+void ProxyMemoryPool::Free(uint8_t* buffer, int64_t size) {
+ pool_->Free(buffer, size);
+ bytes_allocated_ -= size;
+}
+
+int64_t ProxyMemoryPool::bytes_allocated() const { return
bytes_allocated_.load(); }
+
+int64_t ProxyMemoryPool::max_memory() const { return max_memory_.load(); }
+
} // namespace arrow
diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h
index 348343b..de58896 100644
--- a/cpp/src/arrow/memory_pool.h
+++ b/cpp/src/arrow/memory_pool.h
@@ -20,6 +20,7 @@
#include <atomic>
#include <cstdint>
+#include <mutex>
#include "arrow/util/visibility.h"
@@ -86,6 +87,30 @@ class ARROW_EXPORT LoggingMemoryPool : public MemoryPool {
MemoryPool* pool_;
};
+/// Derived class for memory allocation.
+///
+/// Tracks the number of bytes and maximum memory allocated through its direct
+/// calls. Actual allocation is delegated to MemoryPool class.
+class ARROW_EXPORT ProxyMemoryPool : public MemoryPool {
+ public:
+ explicit ProxyMemoryPool(MemoryPool* pool);
+
+ Status Allocate(int64_t size, uint8_t** out) override;
+ Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr)
override;
+
+ void Free(uint8_t* buffer, int64_t size) override;
+
+ int64_t bytes_allocated() const override;
+
+ int64_t max_memory() const override;
+
+ private:
+ mutable std::mutex lock_;
+ MemoryPool* pool_;
+ std::atomic<int64_t> bytes_allocated_{0};
+ std::atomic<int64_t> max_memory_{0};
+};
+
ARROW_EXPORT MemoryPool* default_memory_pool();
#ifdef ARROW_NO_DEFAULT_MEMORY_POOL
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 6a75c26..89dfd03 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -87,7 +87,7 @@ from pyarrow.lib import (null, bool_,
from pyarrow.lib import (Buffer, ResizableBuffer, foreign_buffer, py_buffer,
compress, decompress, allocate_buffer)
-from pyarrow.lib import (MemoryPool, total_allocated_bytes,
+from pyarrow.lib import (MemoryPool, ProxyMemoryPool, total_allocated_bytes,
set_memory_pool, default_memory_pool,
log_memory_allocations)
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index a794444..defb91d 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -191,6 +191,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
cdef cppclass CLoggingMemoryPool" arrow::LoggingMemoryPool"(CMemoryPool):
CLoggingMemoryPool(CMemoryPool*)
+ cdef cppclass CProxyMemoryPool" arrow::ProxyMemoryPool"(CMemoryPool):
+ CProxyMemoryPool(CMemoryPool*)
+
cdef cppclass CBuffer" arrow::Buffer":
CBuffer(const uint8_t* data, int64_t size)
const uint8_t* data()
diff --git a/python/pyarrow/memory.pxi b/python/pyarrow/memory.pxi
index 3d2601f..1a461ef 100644
--- a/python/pyarrow/memory.pxi
+++ b/python/pyarrow/memory.pxi
@@ -44,6 +44,19 @@ cdef class LoggingMemoryPool(MemoryPool):
self.init(self.logging_pool.get())
+cdef class ProxyMemoryPool(MemoryPool):
+ """
+ Derived MemoryPool class that tracks the number of bytes and
+ maximum memory allocated through its direct calls.
+ """
+ cdef:
+ unique_ptr[CProxyMemoryPool] proxy_pool
+
+ def __cinit__(self, MemoryPool pool):
+ self.proxy_pool.reset(new CProxyMemoryPool(pool.pool))
+ self.init(self.proxy_pool.get())
+
+
def default_memory_pool():
cdef:
MemoryPool pool = MemoryPool()
--
To stop receiving notification emails like this one, please contact
[email protected].