This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new e82a34a  ARROW-530: [C++/Python] Provide subpools for better memory 
allocation …
e82a34a is described below

commit e82a34a3cb30a92a5ef00cca74d054e84cee7a6c
Author: Rok <[email protected]>
AuthorDate: Thu Jun 7 21:42:12 2018 +0200

    ARROW-530: [C++/Python] Provide subpools for better memory allocation …
    
    …tracking
    
    Currently we can only track the amount of bytes allocated by the main 
memory pool or the alternative jemalloc implementation. To better understand 
certain situation, we should provide a MemoryPool proxy implementation that 
tracks only the amount of memory that was made through its direct calls but 
delegates the actual allocation to an underlying pool.
    
    Authors: Rok Mihevc <[email protected]> and Alex Hagerman 
<[email protected]>
    
    Usage example:
    ---------------------
    import pyarrow as pa
    
    def report(pool, proxy_pool):
        print("Total: ", pa.total_allocated_bytes())
        print("Default pool: ", pool.bytes_allocated())
        print("Proxy allocated: ", proxy_pool.proxy_bytes_allocated())
    
    pool = pa.default_memory_pool()
    proxy_pool = pa.ProxyMemoryPool(pool)
    
    report(pool, proxy_pool)
    a1 = pa.array([0]*1000, memory_pool=pool)
    report(pool, proxy_pool)
    a2 = pa.array([0]*1, memory_pool=proxy_pool)
    report(pool, proxy_pool)
    a3 = pa.array([0]*1000, memory_pool=proxy_pool)
    report(pool, proxy_pool)
    a3 = pa.array([0]*1000, memory_pool=proxy_pool)
    report(pool, proxy_pool)
    
    Result:
    ---------------------
    Total:  0
    Default pool:  0
    bytes_allocated: 0
    Proxy allocated:  0
    Total:  8128
    Default pool:  8128
    bytes_allocated: 0
    Proxy allocated:  0
    Allocate: size = 64
    Allocate: size = 256
    Reallocate: old_size = 256 - new_size = 64 - proxy_allocated - 128
    Total:  8256
    Default pool:  8256
    bytes_allocated: 128
    Proxy allocated:  128
    Allocate: size = 128
    Allocate: size = 8192
    Reallocate: old_size = 8192 - new_size = 8000 - proxy_allocated - 8256
    Total:  16384
    Default pool:  16384
    bytes_allocated: 8256
    Proxy allocated:  8256
    Allocate: size = 128
    Allocate: size = 8192
    Reallocate: old_size = 8192 - new_size = 8000 - proxy_allocated - 16384
    Free: size = 128
    Free: size = 8000
    Total:  16384
    Default pool:  16384
    bytes_allocated: 8256
    Proxy allocated:  8256
    
    Author: Rok <[email protected]>
    
    Closes #2057 from rok/master and squashes the following commits:
    
    eabd8a4f <Rok> changing ProxyMemoryPool instantiation
    96e58ee6 <Rok> fixing linting issue
    5bebf97d <Rok> adding docstring
    195a1367 <Rok> implementing feedback
    0645e500 <Rok> refactoring ProxyMemoryPool interface
    17a5c9b6 <Rok> ARROW-530 - C++ linter fix
    f90b8970 <Rok> C++ unit tests for ARROW-530 passing
    7c4b7692 <Rok> adding c++ unit tests
    62dcdb0a <Rok> ARROW-530: C++/Python: Provide subpools for better memory 
allocation tracking
---
 cpp/src/arrow/memory_pool-test.cc    | 21 +++++++++++++++++++++
 cpp/src/arrow/memory_pool.cc         | 36 ++++++++++++++++++++++++++++++++++++
 cpp/src/arrow/memory_pool.h          | 25 +++++++++++++++++++++++++
 python/pyarrow/__init__.py           |  2 +-
 python/pyarrow/includes/libarrow.pxd |  3 +++
 python/pyarrow/memory.pxi            | 13 +++++++++++++
 6 files changed, 99 insertions(+), 1 deletion(-)

diff --git a/cpp/src/arrow/memory_pool-test.cc 
b/cpp/src/arrow/memory_pool-test.cc
index c5e3ef2..8915708 100644
--- a/cpp/src/arrow/memory_pool-test.cc
+++ b/cpp/src/arrow/memory_pool-test.cc
@@ -91,4 +91,25 @@ TEST(LoggingMemoryPool, Logging) {
 
   ASSERT_EQ(200, pool->max_memory());
 }
+
+TEST(ProxyMemoryPool, Logging) {
+  MemoryPool* pool = default_memory_pool();
+
+  ProxyMemoryPool pp(pool);
+
+  uint8_t* data;
+  ASSERT_OK(pool->Allocate(100, &data));
+
+  uint8_t* data2;
+  ASSERT_OK(pp.Allocate(300, &data2));
+
+  ASSERT_EQ(400, pool->bytes_allocated());
+  ASSERT_EQ(300, pp.bytes_allocated());
+
+  pool->Free(data, 100);
+  pp.Free(data2, 300);
+
+  ASSERT_EQ(0, pool->bytes_allocated());
+  ASSERT_EQ(0, pp.bytes_allocated());
+}
 }  // namespace arrow
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index dedab7e..34bd600 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -201,4 +201,40 @@ int64_t LoggingMemoryPool::max_memory() const {
   std::cout << "max_memory: " << mem << std::endl;
   return mem;
 }
+
+ProxyMemoryPool::ProxyMemoryPool(MemoryPool* pool) : pool_(pool) {}
+
+Status ProxyMemoryPool::Allocate(int64_t size, uint8_t** out) {
+  RETURN_NOT_OK(pool_->Allocate(size, out));
+  bytes_allocated_ += size;
+  {
+    std::lock_guard<std::mutex> guard(lock_);
+    if (bytes_allocated_ > max_memory_) {
+      max_memory_ = bytes_allocated_.load();
+    }
+  }
+  return Status::OK();
+}
+
+Status ProxyMemoryPool::Reallocate(int64_t old_size, int64_t new_size, 
uint8_t** ptr) {
+  RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr));
+  bytes_allocated_ += new_size - old_size;
+  {
+    std::lock_guard<std::mutex> guard(lock_);
+    if (bytes_allocated_ > max_memory_) {
+      max_memory_ = bytes_allocated_.load();
+    }
+  }
+  return Status::OK();
+}
+
+void ProxyMemoryPool::Free(uint8_t* buffer, int64_t size) {
+  pool_->Free(buffer, size);
+  bytes_allocated_ -= size;
+}
+
+int64_t ProxyMemoryPool::bytes_allocated() const { return 
bytes_allocated_.load(); }
+
+int64_t ProxyMemoryPool::max_memory() const { return max_memory_.load(); }
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h
index 348343b..de58896 100644
--- a/cpp/src/arrow/memory_pool.h
+++ b/cpp/src/arrow/memory_pool.h
@@ -20,6 +20,7 @@
 
 #include <atomic>
 #include <cstdint>
+#include <mutex>
 
 #include "arrow/util/visibility.h"
 
@@ -86,6 +87,30 @@ class ARROW_EXPORT LoggingMemoryPool : public MemoryPool {
   MemoryPool* pool_;
 };
 
+/// Derived class for memory allocation.
+///
+/// Tracks the number of bytes and maximum memory allocated through its direct
+/// calls. Actual allocation is delegated to MemoryPool class.
+class ARROW_EXPORT ProxyMemoryPool : public MemoryPool {
+ public:
+  explicit ProxyMemoryPool(MemoryPool* pool);
+
+  Status Allocate(int64_t size, uint8_t** out) override;
+  Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) 
override;
+
+  void Free(uint8_t* buffer, int64_t size) override;
+
+  int64_t bytes_allocated() const override;
+
+  int64_t max_memory() const override;
+
+ private:
+  mutable std::mutex lock_;
+  MemoryPool* pool_;
+  std::atomic<int64_t> bytes_allocated_{0};
+  std::atomic<int64_t> max_memory_{0};
+};
+
 ARROW_EXPORT MemoryPool* default_memory_pool();
 
 #ifdef ARROW_NO_DEFAULT_MEMORY_POOL
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 6a75c26..89dfd03 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -87,7 +87,7 @@ from pyarrow.lib import (null, bool_,
 from pyarrow.lib import (Buffer, ResizableBuffer, foreign_buffer, py_buffer,
                          compress, decompress, allocate_buffer)
 
-from pyarrow.lib import (MemoryPool, total_allocated_bytes,
+from pyarrow.lib import (MemoryPool, ProxyMemoryPool, total_allocated_bytes,
                          set_memory_pool, default_memory_pool,
                          log_memory_allocations)
 
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index a794444..defb91d 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -191,6 +191,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
     cdef cppclass CLoggingMemoryPool" arrow::LoggingMemoryPool"(CMemoryPool):
         CLoggingMemoryPool(CMemoryPool*)
 
+    cdef cppclass CProxyMemoryPool" arrow::ProxyMemoryPool"(CMemoryPool):
+        CProxyMemoryPool(CMemoryPool*)
+
     cdef cppclass CBuffer" arrow::Buffer":
         CBuffer(const uint8_t* data, int64_t size)
         const uint8_t* data()
diff --git a/python/pyarrow/memory.pxi b/python/pyarrow/memory.pxi
index 3d2601f..1a461ef 100644
--- a/python/pyarrow/memory.pxi
+++ b/python/pyarrow/memory.pxi
@@ -44,6 +44,19 @@ cdef class LoggingMemoryPool(MemoryPool):
         self.init(self.logging_pool.get())
 
 
+cdef class ProxyMemoryPool(MemoryPool):
+    """
+    Derived MemoryPool class that tracks the number of bytes and
+    maximum memory allocated through its direct calls.
+    """
+    cdef:
+        unique_ptr[CProxyMemoryPool] proxy_pool
+
+    def __cinit__(self, MemoryPool pool):
+        self.proxy_pool.reset(new CProxyMemoryPool(pool.pool))
+        self.init(self.proxy_pool.get())
+
+
 def default_memory_pool():
     cdef:
         MemoryPool pool = MemoryPool()

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to