Repository: arrow Updated Branches: refs/heads/master 31f145dc5 -> dc6cefde4
ARROW-521: [C++] Track peak allocations in default memory pool This should enable us to remove the `parquet::MemoryAllocator` implementation in parquet-cpp Author: Wes McKinney <[email protected]> Closes #330 from wesm/ARROW-521 and squashes the following commits: 10531c4 [Wes McKinney] Move max_memory_ member to DefaultMemoryPool, add default virtual max_memory() to MemoryPool a0d134d [Wes McKinney] Add max_memory() method to MemoryPool, leave implementation to subclasses Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/dc6cefde Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/dc6cefde Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/dc6cefde Branch: refs/heads/master Commit: dc6cefde46c65ce4753bec3fbafc44a20944f9c9 Parents: 31f145d Author: Wes McKinney <[email protected]> Authored: Thu Feb 9 10:43:53 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Thu Feb 9 10:43:53 2017 -0500 ---------------------------------------------------------------------- cpp/src/arrow/array-primitive-test.cc | 3 +- cpp/src/arrow/buffer-test.cc | 2 +- cpp/src/arrow/ipc/json-integration-test.cc | 6 ++- cpp/src/arrow/memory_pool-test.cc | 17 ++++++++ cpp/src/arrow/memory_pool.cc | 54 ++++++++++++------------- cpp/src/arrow/memory_pool.h | 31 ++++++++++++++ cpp/src/arrow/table.cc | 6 ++- cpp/src/arrow/util/logging.h | 4 +- cpp/src/arrow/util/macros.h | 2 +- 9 files changed, 89 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/array-primitive-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array-primitive-test.cc b/cpp/src/arrow/array-primitive-test.cc index a20fdbf..f8bbd77 100644 --- a/cpp/src/arrow/array-primitive-test.cc +++ b/cpp/src/arrow/array-primitive-test.cc @@ -242,7 +242,8 @@ void TestPrimitiveBuilder<PBoolean>::Check( } typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PInt16, - PInt32, PInt64, PFloat, PDouble> Primitives; + PInt32, PInt64, PFloat, PDouble> + Primitives; TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives); http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/buffer-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/buffer-test.cc b/cpp/src/arrow/buffer-test.cc index d76e991..934fcfe 100644 --- a/cpp/src/arrow/buffer-test.cc +++ b/cpp/src/arrow/buffer-test.cc @@ -67,7 +67,7 @@ TEST_F(TestBuffer, Resize) { } TEST_F(TestBuffer, ResizeOOM) { - // This test doesn't play nice with AddressSanitizer +// This test doesn't play nice with AddressSanitizer #ifndef ADDRESS_SANITIZER // realloc fails, even though there may be no explicit limit PoolBuffer buf; http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/ipc/json-integration-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index 17ccc4a..95bc742 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -144,8 +144,10 @@ static Status ValidateArrowVsJson( if (!json_schema->Equals(arrow_schema)) { std::stringstream ss; - ss << "JSON schema: \n" << json_schema->ToString() << "\n" - << "Arrow schema: \n" << arrow_schema->ToString(); + ss << "JSON schema: \n" + << json_schema->ToString() << "\n" + << "Arrow schema: \n" + << arrow_schema->ToString(); if (FLAGS_verbose) { std::cout << ss.str() << std::endl; } return Status::Invalid("Schemas did not match"); http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/memory_pool-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/memory_pool-test.cc b/cpp/src/arrow/memory_pool-test.cc index 56bb32f..6ab73fb 100644 --- a/cpp/src/arrow/memory_pool-test.cc +++ b/cpp/src/arrow/memory_pool-test.cc @@ -59,6 +59,23 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) { pool->Free(data, 100); } +TEST(DefaultMemoryPoolDeathTest, MaxMemory) { + DefaultMemoryPool pool; + + ASSERT_EQ(0, pool.max_memory()); + + uint8_t* data; + ASSERT_OK(pool.Allocate(100, &data)); + + uint8_t* data2; + ASSERT_OK(pool.Allocate(100, &data2)); + + pool.Free(data, 100); + pool.Free(data2, 100); + + ASSERT_EQ(200, pool.max_memory()); +} + #endif // ARROW_VALGRIND } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/memory_pool.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index aea5e21..8d85a08 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -60,36 +60,30 @@ Status AllocateAligned(int64_t size, uint8_t** out) { } } // namespace -MemoryPool::~MemoryPool() {} - -class InternalMemoryPool : public MemoryPool { - public: - InternalMemoryPool() : bytes_allocated_(0) {} - virtual ~InternalMemoryPool(); - - Status Allocate(int64_t size, uint8_t** out) override; - Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override; +MemoryPool::MemoryPool() {} - void Free(uint8_t* buffer, int64_t size) override; +MemoryPool::~MemoryPool() {} - int64_t bytes_allocated() const override; +int64_t MemoryPool::max_memory() const { + return -1; +} - private: - mutable std::mutex pool_lock_; - int64_t bytes_allocated_; -}; +DefaultMemoryPool::DefaultMemoryPool() : bytes_allocated_(0) { + max_memory_ = 0; +} -Status InternalMemoryPool::Allocate(int64_t size, uint8_t** out) { - std::lock_guard<std::mutex> guard(pool_lock_); +Status DefaultMemoryPool::Allocate(int64_t size, uint8_t** out) { RETURN_NOT_OK(AllocateAligned(size, out)); bytes_allocated_ += size; + { + std::lock_guard<std::mutex> guard(lock_); + if (bytes_allocated_ > max_memory_) { max_memory_ = bytes_allocated_.load(); } + } return Status::OK(); } -Status InternalMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { - std::lock_guard<std::mutex> guard(pool_lock_); - +Status DefaultMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { // Note: We cannot use realloc() here as it doesn't guarantee alignment. // Allocate new chunk @@ -105,17 +99,19 @@ Status InternalMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_ *ptr = out; bytes_allocated_ += new_size - old_size; + { + std::lock_guard<std::mutex> guard(lock_); + if (bytes_allocated_ > max_memory_) { max_memory_ = bytes_allocated_.load(); } + } return Status::OK(); } -int64_t InternalMemoryPool::bytes_allocated() const { - std::lock_guard<std::mutex> guard(pool_lock_); - return bytes_allocated_; +int64_t DefaultMemoryPool::bytes_allocated() const { + return bytes_allocated_.load(); } -void InternalMemoryPool::Free(uint8_t* buffer, int64_t size) { - std::lock_guard<std::mutex> guard(pool_lock_); +void DefaultMemoryPool::Free(uint8_t* buffer, int64_t size) { DCHECK_GE(bytes_allocated_, size); #ifdef _MSC_VER _aligned_free(buffer); @@ -125,10 +121,14 @@ void InternalMemoryPool::Free(uint8_t* buffer, int64_t size) { bytes_allocated_ -= size; } -InternalMemoryPool::~InternalMemoryPool() {} +int64_t DefaultMemoryPool::max_memory() const { + return max_memory_.load(); +} + +DefaultMemoryPool::~DefaultMemoryPool() {} MemoryPool* default_memory_pool() { - static InternalMemoryPool default_memory_pool_; + static DefaultMemoryPool default_memory_pool_; return &default_memory_pool_; } http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/memory_pool.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 89477b6..33d4c3e 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -18,7 +18,9 @@ #ifndef ARROW_UTIL_MEMORY_POOL_H #define ARROW_UTIL_MEMORY_POOL_H +#include <atomic> #include <cstdint> +#include <mutex> #include "arrow/util/visibility.h" @@ -56,6 +58,35 @@ class ARROW_EXPORT MemoryPool { /// The number of bytes that were allocated and not yet free'd through /// this allocator. virtual int64_t bytes_allocated() const = 0; + + /// Return peak memory allocation in this memory pool + /// + /// \return Maximum bytes allocated. If not known (or not implemented), + /// returns -1 + virtual int64_t max_memory() const; + + protected: + MemoryPool(); +}; + +class ARROW_EXPORT DefaultMemoryPool : public MemoryPool { + public: + DefaultMemoryPool(); + virtual ~DefaultMemoryPool(); + + Status Allocate(int64_t size, uint8_t** out) override; + Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override; + + void Free(uint8_t* buffer, int64_t size) override; + + int64_t bytes_allocated() const override; + + int64_t max_memory() const override; + + private: + mutable std::mutex lock_; + std::atomic<int64_t> bytes_allocated_; + std::atomic<int64_t> max_memory_; }; ARROW_EXPORT MemoryPool* default_memory_pool(); http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/table.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index 9e31ba5..a9e0909 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -106,7 +106,8 @@ Status Table::FromRecordBatches(const std::string& name, if (!batches[i]->schema()->Equals(schema)) { std::stringstream ss; ss << "Schema at index " << static_cast<int>(i) << " was different: \n" - << schema->ToString() << "\nvs\n" << batches[i]->schema()->ToString(); + << schema->ToString() << "\nvs\n" + << batches[i]->schema()->ToString(); return Status::Invalid(ss.str()); } } @@ -138,7 +139,8 @@ Status ConcatenateTables(const std::string& output_name, if (!tables[i]->schema()->Equals(schema)) { std::stringstream ss; ss << "Schema at index " << static_cast<int>(i) << " was different: \n" - << schema->ToString() << "\nvs\n" << tables[i]->schema()->ToString(); + << schema->ToString() << "\nvs\n" + << tables[i]->schema()->ToString(); return Status::Invalid(ss.str()); } } http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/util/logging.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h index 06ee841..b22f07d 100644 --- a/cpp/src/arrow/util/logging.h +++ b/cpp/src/arrow/util/logging.h @@ -118,9 +118,9 @@ class CerrLog { class FatalLog : public CerrLog { public: explicit FatalLog(int /* severity */) // NOLINT - : CerrLog(ARROW_FATAL) {} // NOLINT + : CerrLog(ARROW_FATAL){} // NOLINT - [[noreturn]] ~FatalLog() { + [[noreturn]] ~FatalLog() { if (has_logged_) { std::cerr << std::endl; } std::exit(1); } http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/util/macros.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h index 81a9b0c..c4a62a4 100644 --- a/cpp/src/arrow/util/macros.h +++ b/cpp/src/arrow/util/macros.h @@ -25,6 +25,6 @@ TypeName& operator=(const TypeName&) = delete #endif -#define UNUSED(x) (void) x +#define UNUSED(x) (void)x #endif // ARROW_UTIL_MACROS_H
