pitrou commented on code in PR #48108:
URL: https://github.com/apache/arrow/pull/48108#discussion_r2526600506
##########
cpp/src/arrow/memory_pool.h:
##########
@@ -245,6 +246,73 @@ class ARROW_EXPORT ProxyMemoryPool : public MemoryPool {
std::unique_ptr<ProxyMemoryPoolImpl> impl_;
};
+/// EXPERIMENTAL MemoryPool wrapper with an upper limit
+///
+/// Checking for limits is not done in a fully thread-safe way, therefore
+/// multi-threaded allocations might be able to go successfully above the
+/// configured limit.
+class ARROW_EXPORT CappedMemoryPool : public MemoryPool {
+ public:
+ CappedMemoryPool(MemoryPool* wrapped_pool, int64_t bytes_allocated_limit)
+ : wrapped_(wrapped_pool), bytes_allocated_limit_(bytes_allocated_limit)
{}
+
+ using MemoryPool::Allocate;
+ using MemoryPool::Reallocate;
Review Comment:
At least they were when `ProxyMemoryPool` was written :-)
I think they're necessary to inherit the other Allocate/Reallocate overloads
while overriding the two below.
##########
cpp/src/arrow/memory_pool.h:
##########
@@ -245,6 +246,73 @@ class ARROW_EXPORT ProxyMemoryPool : public MemoryPool {
std::unique_ptr<ProxyMemoryPoolImpl> impl_;
};
+/// EXPERIMENTAL MemoryPool wrapper with an upper limit
+///
+/// Checking for limits is not done in a fully thread-safe way, therefore
+/// multi-threaded allocations might be able to go successfully above the
+/// configured limit.
+class ARROW_EXPORT CappedMemoryPool : public MemoryPool {
+ public:
+ CappedMemoryPool(MemoryPool* wrapped_pool, int64_t bytes_allocated_limit)
+ : wrapped_(wrapped_pool), bytes_allocated_limit_(bytes_allocated_limit)
{}
+
+ using MemoryPool::Allocate;
+ using MemoryPool::Reallocate;
+
+ Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override {
+ // XXX Another thread may allocate memory between the limit check and
+ // the `Allocate` call. It is possible for the two allocations to be
successful
+ // while going above the limit.
+ // Solving this issue would require refactoring the `MemoryPool`
implementation
+ // to delegate the limit check to `MemoryPoolStats`.
+ const auto attempted = size + wrapped_->bytes_allocated();
+ if (ARROW_PREDICT_FALSE(attempted > bytes_allocated_limit_)) {
+ return OutOfMemory(attempted);
+ }
+ return wrapped_->Allocate(size, alignment, out);
+ }
+
+ Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
+ uint8_t** ptr) override {
+ const auto attempted = new_size - old_size + wrapped_->bytes_allocated();
+ if (ARROW_PREDICT_FALSE(attempted > bytes_allocated_limit_)) {
+ return OutOfMemory(attempted);
+ }
+ return wrapped_->Reallocate(old_size, new_size, alignment, ptr);
+ }
+
+ void Free(uint8_t* buffer, int64_t size, int64_t alignment) override {
+ return wrapped_->Free(buffer, size, alignment);
+ }
+
+ void ReleaseUnused() override { wrapped_->ReleaseUnused(); }
+
+ void PrintStats() override { wrapped_->PrintStats(); }
+
+ int64_t bytes_allocated() const override { return
wrapped_->bytes_allocated(); }
+
+ int64_t max_memory() const override { return wrapped_->max_memory(); }
+
+ int64_t total_bytes_allocated() const override {
+ return wrapped_->total_bytes_allocated();
+ }
+
+ int64_t num_allocations() const override { return
wrapped_->num_allocations(); }
+
+ std::string backend_name() const override { return wrapped_->backend_name();
}
+
+ private:
+ Status OutOfMemory(int64_t value) {
+ return Status::OutOfMemory(
+ "MemoryPool bytes_allocated cap exceeded: "
+ "limit=",
+ bytes_allocated_limit_, ", attempted=", value);
+ }
+
+ MemoryPool* wrapped_;
+ int64_t bytes_allocated_limit_;
Review Comment:
I'll give it a try indeed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]