pitrou commented on a change in pull request #12116:
URL: https://github.com/apache/arrow/pull/12116#discussion_r805997122
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
Review comment:
I'm not sure this is useful, because the underlying allocator probably
already relies on mmap for large-ish allocations.
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
+ if (*out == MAP_FAILED) {
+ auto err = errno;
+ return Status::OutOfMemory("Failed to allocate zero buffer of size ",
size, ": ",
+ strerror(err));
+ }
+ return Status::OK();
+ }
+#endif
+ RETURN_NOT_OK(Allocate(size, out));
+ std::memset(*out, 0, size);
+ return Status::OK();
+ }
+
+ void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ munmap(buffer, size);
+ return;
+ }
+#endif
+ Free(buffer, size);
+ }
+
+ public:
+ Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+ // Thread-safely get the current largest buffer of zeros.
Review comment:
This algorithm seems quite complicated and costly (in terms of
inter-thread synchronization).
Instead, how about maintaining lazily-allocated power-of-two-sized entries
(perhaps starting with 4096 to avoid having a ton of tiny size classes)?
Something like:
```c++
static constexpr int kMinPower2Size = 12; // 4096 bytes
static constexpr int kNumAllocClasses = 64 - kMinPower2Size;
// Addresses of allocated zeros, for each size class
std::array<std::atomic<uintptr_t>, kNumAllocClasses> allocated_;
std::mutex allocation_mutex_;
Result<const uint8_t*> GetImmutableZeros(int64_t size) override {
const auto size_class = std::max(0, bit_util::Log2(size) - kMinPower2Size);
auto addr = allocated_[size_class].load();
if (ARROW_PREDICT_TRUE(addr != nullptr)) {
// Fast path: already allocated
return reinterpret_cast<const uint8_t*>(addr);
}
// Slow path: allocate if not already done
std::lock_guard<std::mutex> lock(allocation_mutex_);
auto addr = allocated_[size_class].load();
if (addr == nullptr) {
const int64_t alloc_size = static_cast<int64_t>(1) << (size_class +
kMinPower2Size);
ARROW_ASSIGN_OR_RAISE(const uint8_t* data, AllocateImmutableZeros(size));
allocated_[size_class] = addr = reinterpret_cast<uintptr_t>(data);
}
return reinterpret_cast<const uint8_t*>(addr);
}
```
##########
File path: cpp/src/arrow/device.h
##########
@@ -223,4 +236,55 @@ class ARROW_EXPORT CPUMemoryManager : public MemoryManager
{
ARROW_EXPORT
std::shared_ptr<MemoryManager> default_cpu_memory_manager();
+/// A memory manager that uses the immutable zeros interface of the given
memory pool,
+/// rather than the normal mutable buffer interface.
Review comment:
Hmm, I honestly don't understand why it would be useful to implement
this, while we're already exposing additional methods to `MemoryPool`.
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -536,6 +543,39 @@ int64_t MemoryPool::max_memory() const { return -1; }
// MemoryPool implementation that delegates its core duty
// to an Allocator class.
+class ImmutableZeros : public Buffer {
+ public:
+ explicit ImmutableZeros(uint8_t* data, int64_t size, MemoryPool* pool)
+ : Buffer(data, size, CPUDevice::immutable_zeros_memory_manager(pool)),
+ pool_(pool) {}
+
+ ImmutableZeros() : Buffer(nullptr, 0), pool_(nullptr) {}
+
+ ~ImmutableZeros() override;
+
+ // Prevent copies and handle moves explicitly to avoid double free
Review comment:
It sounds hackish to have a separate ImmutableZeros class, IMHO.
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
Review comment:
Ah, you're right, we don't know that the allocator returned us
zero-allocated memory, unfortunately.
(note that both jemalloc and mimalloc have zero-initializing allocation
APIs, so we could use those)
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
+ if (*out == MAP_FAILED) {
+ auto err = errno;
+ return Status::OutOfMemory("Failed to allocate zero buffer of size ",
size, ": ",
+ strerror(err));
+ }
+ return Status::OK();
+ }
+#endif
+ RETURN_NOT_OK(Allocate(size, out));
+ std::memset(*out, 0, size);
+ return Status::OK();
+ }
+
+ void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ munmap(buffer, size);
+ return;
+ }
+#endif
+ Free(buffer, size);
+ }
+
+ public:
+ Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+ // Thread-safely get the current largest buffer of zeros.
Review comment:
> An inability to free something, especially if that something is large,
feels like bad news to me, so I'm hesitant to just copy your version in and
call it a day.
That is true. However, it shouldn't be a concern if we can ensure that the
pages don't actually allocate physical memory (or almost none of it, such as
`/dev/zero`).
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
+ if (*out == MAP_FAILED) {
+ auto err = errno;
+ return Status::OutOfMemory("Failed to allocate zero buffer of size ",
size, ": ",
+ strerror(err));
+ }
+ return Status::OK();
+ }
+#endif
+ RETURN_NOT_OK(Allocate(size, out));
+ std::memset(*out, 0, size);
+ return Status::OK();
+ }
+
+ void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ munmap(buffer, size);
+ return;
+ }
+#endif
+ Free(buffer, size);
+ }
+
+ public:
+ Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+ // Thread-safely get the current largest buffer of zeros.
Review comment:
> * I'm not sure if it's even needed, though: it isn't if you can safely
make a copy of a `shared_ptr` while another thread may be updating the
contained pointer.
You can safely update the object _pointed to_. However, changing the
_pointer_ requires use of dedicated atomic access functions:
https://en.cppreference.com/w/cpp/memory/shared_ptr/atomic
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
+ if (*out == MAP_FAILED) {
+ auto err = errno;
+ return Status::OutOfMemory("Failed to allocate zero buffer of size ",
size, ": ",
+ strerror(err));
+ }
+ return Status::OK();
+ }
+#endif
+ RETURN_NOT_OK(Allocate(size, out));
+ std::memset(*out, 0, size);
+ return Status::OK();
+ }
+
+ void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ munmap(buffer, size);
+ return;
+ }
+#endif
+ Free(buffer, size);
+ }
+
+ public:
+ Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+ // Thread-safely get the current largest buffer of zeros.
Review comment:
> * I'm not sure if it's even needed, though: it isn't if you can safely
make a copy of a `shared_ptr` while another thread may be updating the
contained pointer.
You can safely update the object _pointed to_ (well, except that accessing
that object from different threads then becomes racy). However, changing the
_pointer_ requires use of dedicated atomic access functions:
https://en.cppreference.com/w/cpp/memory/shared_ptr/atomic
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
+ if (*out == MAP_FAILED) {
+ auto err = errno;
+ return Status::OutOfMemory("Failed to allocate zero buffer of size ",
size, ": ",
+ strerror(err));
+ }
+ return Status::OK();
+ }
+#endif
+ RETURN_NOT_OK(Allocate(size, out));
+ std::memset(*out, 0, size);
+ return Status::OK();
+ }
+
+ void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ munmap(buffer, size);
+ return;
+ }
+#endif
+ Free(buffer, size);
+ }
+
+ public:
+ Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+ // Thread-safely get the current largest buffer of zeros.
Review comment:
> * I'm not sure if it's even needed, though: it isn't if you can safely
make a copy of a `shared_ptr` while another thread may be updating the
contained pointer.
You can safely update the object _pointed to_ (well, except that accessing
that object from different threads then becomes racy). However, changing the
_pointer_ requires use of dedicated atomic access functions:
https://en.cppreference.com/w/cpp/memory/shared_ptr/atomic
I guess this means you should favour changing the pointer, since all
accesses will be confined in this utility function.
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
Review comment:
@jvanstraten You're right. So let's keep it like this and just explain
why it is in a comment.
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
Review comment:
Can you hide the entire mmap() handling behind a compatibility wrapper
in `arrow/util/io_util.{h,cc}`, so that we can easily add Windows and macOS
support?
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
Review comment:
Note that
[VirtualAlloc](https://docs.microsoft.com/en-us/windows/win32/api/memoryapi/nf-memoryapi-virtualalloc)
on Windows zero-initializes pages. We can fall back on `calloc` by default.
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
+ if (*out == MAP_FAILED) {
+ auto err = errno;
+ return Status::OutOfMemory("Failed to allocate zero buffer of size ",
size, ": ",
+ strerror(err));
+ }
+ return Status::OK();
+ }
+#endif
+ // TODO: jemalloc and mimalloc support zero-initialized allocations as
+ // well, which might be faster than allocate + memset.
+ RETURN_NOT_OK(Allocate(size, out));
+ std::memset(*out, 0, size);
+ return Status::OK();
+ }
+
+ void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ munmap(buffer, size);
+ return;
+ }
+#endif
+ Free(buffer, size);
+ }
+
+ public:
+ Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+ // Thread-safely get the current largest buffer of zeros.
+ auto current_buffer = atomic_load(&immutable_zeros_cache_);
+
+ // If this buffer satisfies the requirements, return it.
+ if (current_buffer && current_buffer->size() >= size) {
+ return std::move(current_buffer);
+ }
+
+ // Acquire the lock for allocating a new buffer.
+ std::lock_guard<std::mutex> gg(immutable_zeros_mutex_);
+
+ // Between our previous atomic load and acquisition of the lock, another
+ // thread may have allocated a buffer. So we need to check again.
+ current_buffer = atomic_load(&immutable_zeros_cache_);
+ if (current_buffer && current_buffer->size() >= size) {
+ return std::move(current_buffer);
+ }
+
+ // Let's now figure out a good size to allocate. This is done
+ // heuristically, with the following rules:
+ // - allocate at least the requested size (obviously);
+ // - allocate at least 2x the previous size;
+ // - allocate at least kMinAllocSize bytes (to avoid lots of small
+ // allocations).
+ static const int64_t kMinAllocSize = 4096;
+ int64_t alloc_size =
+ std::max(size, current_buffer ? (current_buffer->size() * 2) :
kMinAllocSize);
Review comment:
Perhaps `bit_util::RoundUpToPowerOf2(size)` rather than `size`?
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
+ if (*out == MAP_FAILED) {
+ auto err = errno;
+ return Status::OutOfMemory("Failed to allocate zero buffer of size ",
size, ": ",
+ strerror(err));
+ }
+ return Status::OK();
+ }
+#endif
+ // TODO: jemalloc and mimalloc support zero-initialized allocations as
+ // well, which might be faster than allocate + memset.
+ RETURN_NOT_OK(Allocate(size, out));
+ std::memset(*out, 0, size);
+ return Status::OK();
+ }
+
+ void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ munmap(buffer, size);
+ return;
+ }
+#endif
+ Free(buffer, size);
+ }
+
+ public:
+ Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+ // Thread-safely get the current largest buffer of zeros.
+ auto current_buffer = atomic_load(&immutable_zeros_cache_);
+
+ // If this buffer satisfies the requirements, return it.
+ if (current_buffer && current_buffer->size() >= size) {
+ return std::move(current_buffer);
+ }
+
+ // Acquire the lock for allocating a new buffer.
+ std::lock_guard<std::mutex> gg(immutable_zeros_mutex_);
+
+ // Between our previous atomic load and acquisition of the lock, another
+ // thread may have allocated a buffer. So we need to check again.
+ current_buffer = atomic_load(&immutable_zeros_cache_);
+ if (current_buffer && current_buffer->size() >= size) {
+ return std::move(current_buffer);
+ }
+
+ // Let's now figure out a good size to allocate. This is done
+ // heuristically, with the following rules:
+ // - allocate at least the requested size (obviously);
+ // - allocate at least 2x the previous size;
+ // - allocate at least kMinAllocSize bytes (to avoid lots of small
+ // allocations).
+ static const int64_t kMinAllocSize = 4096;
+ int64_t alloc_size =
+ std::max(size, current_buffer ? (current_buffer->size() * 2) :
kMinAllocSize);
Review comment:
Perhaps `bit_util::RoundUpToPowerOf2(size)` rather than `size`?
Edit: perhaps that would be detrimental for large allocations. Nevermind.
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
+ if (*out == MAP_FAILED) {
+ auto err = errno;
+ return Status::OutOfMemory("Failed to allocate zero buffer of size ",
size, ": ",
+ strerror(err));
+ }
+ return Status::OK();
+ }
+#endif
+ // TODO: jemalloc and mimalloc support zero-initialized allocations as
+ // well, which might be faster than allocate + memset.
+ RETURN_NOT_OK(Allocate(size, out));
+ std::memset(*out, 0, size);
+ return Status::OK();
+ }
+
+ void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ munmap(buffer, size);
+ return;
+ }
+#endif
+ Free(buffer, size);
+ }
+
+ public:
+ Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+ // Thread-safely get the current largest buffer of zeros.
+ auto current_buffer = atomic_load(&immutable_zeros_cache_);
+
+ // If this buffer satisfies the requirements, return it.
+ if (current_buffer && current_buffer->size() >= size) {
+ return std::move(current_buffer);
+ }
+
+ // Acquire the lock for allocating a new buffer.
+ std::lock_guard<std::mutex> gg(immutable_zeros_mutex_);
+
+ // Between our previous atomic load and acquisition of the lock, another
+ // thread may have allocated a buffer. So we need to check again.
+ current_buffer = atomic_load(&immutable_zeros_cache_);
+ if (current_buffer && current_buffer->size() >= size) {
+ return std::move(current_buffer);
+ }
+
+ // Let's now figure out a good size to allocate. This is done
+ // heuristically, with the following rules:
+ // - allocate at least the requested size (obviously);
+ // - allocate at least 2x the previous size;
+ // - allocate at least kMinAllocSize bytes (to avoid lots of small
+ // allocations).
+ static const int64_t kMinAllocSize = 4096;
+ int64_t alloc_size =
+ std::max(size, current_buffer ? (current_buffer->size() * 2) :
kMinAllocSize);
+
+ // Attempt to allocate the block.
+ uint8_t* data = nullptr;
+ auto result = AllocateImmutableZeros(alloc_size, &data);
+
+ // If we fail to do so, fall back to trying to allocate the requested size
+ // exactly as a last-ditch effort.
+ if (!result.ok() || data == nullptr) {
Review comment:
Hmm, why is it possible to get `nullptr` here?
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
+ if (*out == MAP_FAILED) {
+ auto err = errno;
+ return Status::OutOfMemory("Failed to allocate zero buffer of size ",
size, ": ",
+ strerror(err));
+ }
+ return Status::OK();
+ }
+#endif
+ // TODO: jemalloc and mimalloc support zero-initialized allocations as
+ // well, which might be faster than allocate + memset.
+ RETURN_NOT_OK(Allocate(size, out));
+ std::memset(*out, 0, size);
+ return Status::OK();
+ }
+
+ void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ munmap(buffer, size);
+ return;
+ }
+#endif
+ Free(buffer, size);
+ }
+
+ public:
+ Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+ // Thread-safely get the current largest buffer of zeros.
+ auto current_buffer = atomic_load(&immutable_zeros_cache_);
+
+ // If this buffer satisfies the requirements, return it.
+ if (current_buffer && current_buffer->size() >= size) {
+ return std::move(current_buffer);
+ }
+
+ // Acquire the lock for allocating a new buffer.
+ std::lock_guard<std::mutex> gg(immutable_zeros_mutex_);
+
+ // Between our previous atomic load and acquisition of the lock, another
+ // thread may have allocated a buffer. So we need to check again.
+ current_buffer = atomic_load(&immutable_zeros_cache_);
+ if (current_buffer && current_buffer->size() >= size) {
+ return std::move(current_buffer);
+ }
+
+ // Let's now figure out a good size to allocate. This is done
+ // heuristically, with the following rules:
+ // - allocate at least the requested size (obviously);
+ // - allocate at least 2x the previous size;
+ // - allocate at least kMinAllocSize bytes (to avoid lots of small
+ // allocations).
+ static const int64_t kMinAllocSize = 4096;
+ int64_t alloc_size =
+ std::max(size, current_buffer ? (current_buffer->size() * 2) :
kMinAllocSize);
+
+ // Attempt to allocate the block.
+ uint8_t* data = nullptr;
+ auto result = AllocateImmutableZeros(alloc_size, &data);
+
+ // If we fail to do so, fall back to trying to allocate the requested size
+ // exactly as a last-ditch effort.
+ if (!result.ok() || data == nullptr) {
+ alloc_size = size;
+ RETURN_NOT_OK(AllocateImmutableZeros(alloc_size, &data));
+ }
+ DCHECK_NE(data, nullptr);
+
+ // Move ownership of the data block into an ImmutableZeros object. It will
+ // free the block when destroyed, i.e. when all shared_ptr references to it
+ // are reset or go out of scope.
+ current_buffer = std::make_shared<ImmutableZeros>(data, alloc_size, this);
+
+ // Store a reference to the new block in the cache, so subsequent calls to
+ // this function (from this thread or from other threads) can use it, too.
+ atomic_store(&immutable_zeros_cache_, current_buffer);
+
+ return std::move(current_buffer);
+ }
+
+ void ReleaseUnused() override {
+ // Get rid of the ImmutableZeros cache if we're the only one using it. If
+ // there are other pieces of code using it, getting rid of the cache won't
+ // deallocate it anyway, so it's better to hold onto it.
+ {
+ auto cache = atomic_load(&immutable_zeros_cache_);
+
+ // Because we now have a copy in our thread, the use count will be 2 if
+ // nothing else is using it.
+ if (cache.use_count() <= 2) {
+ atomic_store(&immutable_zeros_cache_,
std::shared_ptr<ImmutableZeros>());
Review comment:
Is it a problem if `immutable_zeros_cache_` was modified in the
meantime? Probably not, just checking.
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
Review comment:
Note that
[VirtualAlloc](https://docs.microsoft.com/en-us/windows/win32/api/memoryapi/nf-memoryapi-virtualalloc)
on Windows zero-initializes pages. We can fall back on
[calloc](https://pubs.opengroup.org/onlinepubs/9699919799/functions/calloc.html)
by default.
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
+ if (*out == MAP_FAILED) {
+ auto err = errno;
+ return Status::OutOfMemory("Failed to allocate zero buffer of size ",
size, ": ",
+ strerror(err));
+ }
+ return Status::OK();
+ }
+#endif
+ // TODO: jemalloc and mimalloc support zero-initialized allocations as
+ // well, which might be faster than allocate + memset.
+ RETURN_NOT_OK(Allocate(size, out));
+ std::memset(*out, 0, size);
+ return Status::OK();
+ }
+
+ void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
Review comment:
Same here: we can hide behind a compatibility function.
##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
stats_.UpdateAllocatedBytes(-size);
}
- void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+ virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+ if (size > 0) {
+ *out = static_cast<uint8_t*>(mmap(
+ nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS |
MAP_NORESERVE, -1, 0));
+ if (*out == MAP_FAILED) {
+ auto err = errno;
+ return Status::OutOfMemory("Failed to allocate zero buffer of size ",
size, ": ",
+ strerror(err));
+ }
+ return Status::OK();
+ }
+#endif
+ // TODO: jemalloc and mimalloc support zero-initialized allocations as
+ // well, which might be faster than allocate + memset.
+ RETURN_NOT_OK(Allocate(size, out));
+ std::memset(*out, 0, size);
+ return Status::OK();
+ }
+
+ void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
Review comment:
Same here: we can hide this behind a compatibility function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]