pitrou commented on a change in pull request #12116:
URL: https://github.com/apache/arrow/pull/12116#discussion_r805997122



##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));

Review comment:
       I'm not sure this is useful, because the underlying allocator probably 
already relies on mmap for large-ish allocations.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.

Review comment:
       This algorithm seems quite complicated and costly (in terms of 
inter-thread synchronization).
   Instead, how about maintaining lazily-allocated power-of-two-sized entries 
(perhaps starting with 4096 to avoid having a ton of tiny size classes)? 
Something like:
   ```c++
   
   static constexpr int kMinPower2Size = 12;  // 4096 bytes
   static constexpr int kNumAllocClasses = 64 - kMinPower2Size;
   // Addresses of allocated zeros, for each size class
   std::array<std::atomic<uintptr_t>, kNumAllocClasses> allocated_;
   std::mutex allocation_mutex_;
   
   Result<const uint8_t*> GetImmutableZeros(int64_t size) override {
     const auto size_class = std::max(0, bit_util::Log2(size) - kMinPower2Size);
     auto addr = allocated_[size_class].load();
     if (ARROW_PREDICT_TRUE(addr != nullptr)) {
       // Fast path: already allocated
       return reinterpret_cast<const uint8_t*>(addr);
     }
     // Slow path: allocate if not already done
     std::lock_guard<std::mutex> lock(allocation_mutex_);
     auto addr = allocated_[size_class].load();
     if (addr == nullptr) {
       const int64_t alloc_size = static_cast<int64_t>(1) << (size_class + 
kMinPower2Size);
       ARROW_ASSIGN_OR_RAISE(const uint8_t* data, AllocateImmutableZeros(size));
       allocated_[size_class] = addr = reinterpret_cast<uintptr_t>(data);
     }
     return reinterpret_cast<const uint8_t*>(addr);
   }
   ```

##########
File path: cpp/src/arrow/device.h
##########
@@ -223,4 +236,55 @@ class ARROW_EXPORT CPUMemoryManager : public MemoryManager 
{
 ARROW_EXPORT
 std::shared_ptr<MemoryManager> default_cpu_memory_manager();
 
+/// A memory manager that uses the immutable zeros interface of the given 
memory pool,
+/// rather than the normal mutable buffer interface.

Review comment:
       Hmm, I honestly don't understand why it would be useful to implement 
this, while we're already exposing additional methods to `MemoryPool`.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -536,6 +543,39 @@ int64_t MemoryPool::max_memory() const { return -1; }
 // MemoryPool implementation that delegates its core duty
 // to an Allocator class.
 
+class ImmutableZeros : public Buffer {
+ public:
+  explicit ImmutableZeros(uint8_t* data, int64_t size, MemoryPool* pool)
+      : Buffer(data, size, CPUDevice::immutable_zeros_memory_manager(pool)),
+        pool_(pool) {}
+
+  ImmutableZeros() : Buffer(nullptr, 0), pool_(nullptr) {}
+
+  ~ImmutableZeros() override;
+
+  // Prevent copies and handle moves explicitly to avoid double free

Review comment:
       It sounds hackish to have a separate ImmutableZeros class, IMHO.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));

Review comment:
       Ah, you're right, we don't know that the allocator returned us 
zero-allocated memory, unfortunately.
   (note that both jemalloc and mimalloc have zero-initializing allocation 
APIs, so we could use those)

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.

Review comment:
       > An inability to free something, especially if that something is large, 
feels like bad news to me, so I'm hesitant to just copy your version in and 
call it a day.
   
   That is true. However, it shouldn't be a concern if we can ensure that the 
pages don't actually allocate physical memory (or almost none of it, such as 
`/dev/zero`).

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.

Review comment:
       > * I'm not sure if it's even needed, though: it isn't if you can safely 
make a copy of a `shared_ptr` while another thread may be updating the 
contained pointer.
   
   You can safely update the object _pointed to_. However, changing the 
_pointer_ requires use of dedicated atomic access functions: 
https://en.cppreference.com/w/cpp/memory/shared_ptr/atomic

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.

Review comment:
       > * I'm not sure if it's even needed, though: it isn't if you can safely 
make a copy of a `shared_ptr` while another thread may be updating the 
contained pointer.
   
   You can safely update the object _pointed to_ (well, except that accessing 
that object from different threads then becomes racy). However, changing the 
_pointer_ requires use of dedicated atomic access functions: 
https://en.cppreference.com/w/cpp/memory/shared_ptr/atomic

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.

Review comment:
       > * I'm not sure if it's even needed, though: it isn't if you can safely 
make a copy of a `shared_ptr` while another thread may be updating the 
contained pointer.
   
   You can safely update the object _pointed to_ (well, except that accessing 
that object from different threads then becomes racy). However, changing the 
_pointer_ requires use of dedicated atomic access functions: 
https://en.cppreference.com/w/cpp/memory/shared_ptr/atomic
   
   I guess this means you should favour changing the pointer, since all 
accesses will be confined in this utility function.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));

Review comment:
       @jvanstraten You're right. So let's keep it like this and just explain 
why it is in a comment.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));

Review comment:
       Can you hide the entire mmap() handling behind a compatibility wrapper 
in `arrow/util/io_util.{h,cc}`, so that we can easily add Windows and macOS 
support?
   

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));

Review comment:
       Note that 
[VirtualAlloc](https://docs.microsoft.com/en-us/windows/win32/api/memoryapi/nf-memoryapi-virtualalloc)
 on Windows zero-initializes pages. We can fall back on `calloc` by default.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    // TODO: jemalloc and mimalloc support zero-initialized allocations as
+    //  well, which might be faster than allocate + memset.
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.
+    auto current_buffer = atomic_load(&immutable_zeros_cache_);
+
+    // If this buffer satisfies the requirements, return it.
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Acquire the lock for allocating a new buffer.
+    std::lock_guard<std::mutex> gg(immutable_zeros_mutex_);
+
+    // Between our previous atomic load and acquisition of the lock, another
+    // thread may have allocated a buffer. So we need to check again.
+    current_buffer = atomic_load(&immutable_zeros_cache_);
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Let's now figure out a good size to allocate. This is done
+    // heuristically, with the following rules:
+    //  - allocate at least the requested size (obviously);
+    //  - allocate at least 2x the previous size;
+    //  - allocate at least kMinAllocSize bytes (to avoid lots of small
+    //    allocations).
+    static const int64_t kMinAllocSize = 4096;
+    int64_t alloc_size =
+        std::max(size, current_buffer ? (current_buffer->size() * 2) : 
kMinAllocSize);

Review comment:
       Perhaps `bit_util::RoundUpToPowerOf2(size)` rather than `size`?

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    // TODO: jemalloc and mimalloc support zero-initialized allocations as
+    //  well, which might be faster than allocate + memset.
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.
+    auto current_buffer = atomic_load(&immutable_zeros_cache_);
+
+    // If this buffer satisfies the requirements, return it.
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Acquire the lock for allocating a new buffer.
+    std::lock_guard<std::mutex> gg(immutable_zeros_mutex_);
+
+    // Between our previous atomic load and acquisition of the lock, another
+    // thread may have allocated a buffer. So we need to check again.
+    current_buffer = atomic_load(&immutable_zeros_cache_);
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Let's now figure out a good size to allocate. This is done
+    // heuristically, with the following rules:
+    //  - allocate at least the requested size (obviously);
+    //  - allocate at least 2x the previous size;
+    //  - allocate at least kMinAllocSize bytes (to avoid lots of small
+    //    allocations).
+    static const int64_t kMinAllocSize = 4096;
+    int64_t alloc_size =
+        std::max(size, current_buffer ? (current_buffer->size() * 2) : 
kMinAllocSize);

Review comment:
       Perhaps `bit_util::RoundUpToPowerOf2(size)` rather than `size`?
   
   Edit: perhaps that would be detrimental for large allocations. Nevermind.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    // TODO: jemalloc and mimalloc support zero-initialized allocations as
+    //  well, which might be faster than allocate + memset.
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.
+    auto current_buffer = atomic_load(&immutable_zeros_cache_);
+
+    // If this buffer satisfies the requirements, return it.
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Acquire the lock for allocating a new buffer.
+    std::lock_guard<std::mutex> gg(immutable_zeros_mutex_);
+
+    // Between our previous atomic load and acquisition of the lock, another
+    // thread may have allocated a buffer. So we need to check again.
+    current_buffer = atomic_load(&immutable_zeros_cache_);
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Let's now figure out a good size to allocate. This is done
+    // heuristically, with the following rules:
+    //  - allocate at least the requested size (obviously);
+    //  - allocate at least 2x the previous size;
+    //  - allocate at least kMinAllocSize bytes (to avoid lots of small
+    //    allocations).
+    static const int64_t kMinAllocSize = 4096;
+    int64_t alloc_size =
+        std::max(size, current_buffer ? (current_buffer->size() * 2) : 
kMinAllocSize);
+
+    // Attempt to allocate the block.
+    uint8_t* data = nullptr;
+    auto result = AllocateImmutableZeros(alloc_size, &data);
+
+    // If we fail to do so, fall back to trying to allocate the requested size
+    // exactly as a last-ditch effort.
+    if (!result.ok() || data == nullptr) {

Review comment:
       Hmm, why is it possible to get `nullptr` here?

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    // TODO: jemalloc and mimalloc support zero-initialized allocations as
+    //  well, which might be faster than allocate + memset.
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.
+    auto current_buffer = atomic_load(&immutable_zeros_cache_);
+
+    // If this buffer satisfies the requirements, return it.
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Acquire the lock for allocating a new buffer.
+    std::lock_guard<std::mutex> gg(immutable_zeros_mutex_);
+
+    // Between our previous atomic load and acquisition of the lock, another
+    // thread may have allocated a buffer. So we need to check again.
+    current_buffer = atomic_load(&immutable_zeros_cache_);
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Let's now figure out a good size to allocate. This is done
+    // heuristically, with the following rules:
+    //  - allocate at least the requested size (obviously);
+    //  - allocate at least 2x the previous size;
+    //  - allocate at least kMinAllocSize bytes (to avoid lots of small
+    //    allocations).
+    static const int64_t kMinAllocSize = 4096;
+    int64_t alloc_size =
+        std::max(size, current_buffer ? (current_buffer->size() * 2) : 
kMinAllocSize);
+
+    // Attempt to allocate the block.
+    uint8_t* data = nullptr;
+    auto result = AllocateImmutableZeros(alloc_size, &data);
+
+    // If we fail to do so, fall back to trying to allocate the requested size
+    // exactly as a last-ditch effort.
+    if (!result.ok() || data == nullptr) {
+      alloc_size = size;
+      RETURN_NOT_OK(AllocateImmutableZeros(alloc_size, &data));
+    }
+    DCHECK_NE(data, nullptr);
+
+    // Move ownership of the data block into an ImmutableZeros object. It will
+    // free the block when destroyed, i.e. when all shared_ptr references to it
+    // are reset or go out of scope.
+    current_buffer = std::make_shared<ImmutableZeros>(data, alloc_size, this);
+
+    // Store a reference to the new block in the cache, so subsequent calls to
+    // this function (from this thread or from other threads) can use it, too.
+    atomic_store(&immutable_zeros_cache_, current_buffer);
+
+    return std::move(current_buffer);
+  }
+
+  void ReleaseUnused() override {
+    // Get rid of the ImmutableZeros cache if we're the only one using it. If
+    // there are other pieces of code using it, getting rid of the cache won't
+    // deallocate it anyway, so it's better to hold onto it.
+    {
+      auto cache = atomic_load(&immutable_zeros_cache_);
+
+      // Because we now have a copy in our thread, the use count will be 2 if
+      // nothing else is using it.
+      if (cache.use_count() <= 2) {
+        atomic_store(&immutable_zeros_cache_, 
std::shared_ptr<ImmutableZeros>());

Review comment:
       Is it a problem if `immutable_zeros_cache_` was modified in the 
meantime? Probably not, just checking.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));

Review comment:
       Note that 
[VirtualAlloc](https://docs.microsoft.com/en-us/windows/win32/api/memoryapi/nf-memoryapi-virtualalloc)
 on Windows zero-initializes pages. We can fall back on 
[calloc](https://pubs.opengroup.org/onlinepubs/9699919799/functions/calloc.html)
 by default.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    // TODO: jemalloc and mimalloc support zero-initialized allocations as
+    //  well, which might be faster than allocate + memset.
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS

Review comment:
       Same here: we can hide behind a compatibility function.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    // TODO: jemalloc and mimalloc support zero-initialized allocations as
+    //  well, which might be faster than allocate + memset.
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS

Review comment:
       Same here: we can hide this behind a compatibility function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to