jvanstraten commented on a change in pull request #12116:
URL: https://github.com/apache/arrow/pull/12116#discussion_r806098292



##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));

Review comment:
       The difference is that, because of the flags passed to mmap, the kernel 
doesn't have to allocate (or clear) physical memory at all, as opposed to the 
allocate + memset that the alternative implementation would do to satisfy 
mutability. You could allocate several terabytes of virtual memory with this 
call if you'd want to, and it would cost you zero physical bytes (page table 
structures aside). At least, it works that way on my kernel.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.

Review comment:
       Compared to my algorithm:
   
    - `+` In terms of thread primitives, your fast path only involves a memory 
fence, whereas mine involves a mutex. I'm not sure if it's even needed, though: 
it isn't if you can safely make a copy of a `shared_ptr` while another thread 
may be updating the contained pointer. That feels like a true statement, at 
least on x86, but I couldn't figure it out for sure from the C++ docs. If I can 
remove it, my fast path is just a `shared_ptr` copy (so, an atomic increment), 
a null check, and a size check, which I'm pretty sure is the fastest way to do 
it that implements reference counting for deallocation.
    - `+` Your version doesn't allocate unnecessarily small buffers.
    - `+` Your version is more readable, especially seeing how unnecessarily 
cryptic I wrote the reallocation logic.
    - `-` Your version has no way to free buffers, so I would argue that it 
leaks memory. Granted, it's upper-bounded by a bit less than 2x the next larger 
power-of-two of the largest buffer allocated, so it won't grow without bound. 
By comparison however, my version will release smaller buffers when they are no 
longer used, and will free its cache when `ReleaseUnused()` is called and there 
are no other users. I also considered a version where the cache is a 
`weak_ptr`, in which case the `ReleaseUnused()` would not be needed, but 
decided against it mostly because `ReleaseUnused()` already existed.
    - `-` Nit, but your version will allocate small buffers regardless of 
whether a larger buffer is already available, whereas my version will return 
the largest buffer allocated thus far, and will automatically free previously 
allocated smaller buffers when all their users go out of scope.
    - `-` Also kind of a nit, but rounding up to power-of-two-sized buffers 
means that you might throw an out of memory error even if almost half of the 
requested memory isn't needed. My algorithm will back off and allocate only as 
much as is needed if the 2 * previous size allocation fails.
   
   An inability to free something, especially if that something is large, feels 
like bad news to me, so I'm hesitant to just copy your version in and call it a 
day. But if nothing else, I'll add a lower bound for allocation size and try to 
rewrite the allocation algorithm to be less cryptic tomorrow.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -536,6 +543,39 @@ int64_t MemoryPool::max_memory() const { return -1; }
 // MemoryPool implementation that delegates its core duty
 // to an Allocator class.
 
+class ImmutableZeros : public Buffer {
+ public:
+  explicit ImmutableZeros(uint8_t* data, int64_t size, MemoryPool* pool)
+      : Buffer(data, size, CPUDevice::immutable_zeros_memory_manager(pool)),
+        pool_(pool) {}
+
+  ImmutableZeros() : Buffer(nullptr, 0), pool_(nullptr) {}
+
+  ~ImmutableZeros() override;
+
+  // Prevent copies and handle moves explicitly to avoid double free

Review comment:
       I'm not sure why or what you mean by that? It's analogous to 
`PoolBuffer` in that it uses RAII to free memory when it is no longer needed, 
except two `Buffer` classes are needed to correctly model shared ownership, 
i.e. `ImmutableZeros` models the data, `ImmutableZerosPoolBuffer` models a 
shared reference to that data (even in contexts where a `unique_ptr` or raw 
pointer/reference to a `Buffer` is needed). Writing it down like this though, 
the names could use some refactoring, especially now that they both implement 
`Buffer`.

##########
File path: cpp/src/arrow/device.h
##########
@@ -223,4 +236,55 @@ class ARROW_EXPORT CPUMemoryManager : public MemoryManager 
{
 ARROW_EXPORT
 std::shared_ptr<MemoryManager> default_cpu_memory_manager();
 
+/// A memory manager that uses the immutable zeros interface of the given 
memory pool,
+/// rather than the normal mutable buffer interface.

Review comment:
       In my mind, if a memory manager is associated with a buffer, it should 
be safe to assume that that memory manager actually *is* managing that buffer, 
i.e. at the very least it should be possible to create a buffer with the same 
allocation behavior by calling the associated memory manager's `AllocateBuffer` 
method. So, I ended up implementing this special memory manager in the end 
because that otherwise wouldn't be the case for these buffers.
   
   In general, I'm too new to the project to have a good understanding of why 
there are so many layers of abstractions and methods for creating a buffer and 
doing memory management... but surely there *are* good reasons for them, so 
wouldn't not implementing them consistently lead to issues down the line?
   
   I will yield to the idea that a lot of this complexity can be removed if 
never being able to safely free these buffers is acceptable, because then 
they'd just become like any other `Buffer` that has no ownership information 
associated with it. But IMO that's a slippery slope at best.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.

Review comment:
       > That is true. However, it shouldn't be a concern if we can ensure that 
the pages don't actually allocate physical memory (or almost none of it, such 
as `/dev/zero`).
   
   I don't think we can in general, unfortunately. I would have no idea how to 
do it on something as ubiquitous as Windows (or if it can be done at all), and 
I'm sure that in general there are more exotic operating systems and 
architectures that simply can't do it. Also, for 32-bit systems/builds (if 
Arrow supports those) virtual memory is also in relatively short supply.
   
   > However, changing the pointer requires use of dedicated atomic access 
functions: [...]
   
   Ah, great, those functions are exactly what I had missed!
   
   I improved my allocation algorithm accordingly here: e628688

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));

Review comment:
       In fact, `Allocate()` generally does not return zero-initialized memory, 
due to the poisoning mechanic. I would rather not dive into this rabbit hole as 
well right now, so I just added a TODO comment in the code. I can file a 
followup JIRA for it too, if you want.

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,109 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.

Review comment:
       > That is true. However, it shouldn't be a concern if we can ensure that 
the pages don't actually allocate physical memory (or almost none of it, such 
as `/dev/zero`).
   
   I don't think we can in general, unfortunately. I would have no idea how to 
do it on something as ubiquitous as Windows (or if it can be done at all), and 
I'm sure that in general there are more exotic operating systems and 
architectures that simply can't do it. Also, for 32-bit systems/builds (if 
Arrow supports those) virtual memory is also in relatively short supply.
   
   > However, changing the pointer requires use of dedicated atomic access 
functions: [...]
   
   Ah, great, those functions are exactly what I had missed!
   
   I improved my allocation algorithm accordingly here: e628688
   
   ETA: and d89c297 (old habits die hard)

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    // TODO: jemalloc and mimalloc support zero-initialized allocations as
+    //  well, which might be faster than allocate + memset.
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.
+    auto current_buffer = atomic_load(&immutable_zeros_cache_);
+
+    // If this buffer satisfies the requirements, return it.
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Acquire the lock for allocating a new buffer.
+    std::lock_guard<std::mutex> gg(immutable_zeros_mutex_);
+
+    // Between our previous atomic load and acquisition of the lock, another
+    // thread may have allocated a buffer. So we need to check again.
+    current_buffer = atomic_load(&immutable_zeros_cache_);
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Let's now figure out a good size to allocate. This is done
+    // heuristically, with the following rules:
+    //  - allocate at least the requested size (obviously);
+    //  - allocate at least 2x the previous size;
+    //  - allocate at least kMinAllocSize bytes (to avoid lots of small
+    //    allocations).
+    static const int64_t kMinAllocSize = 4096;
+    int64_t alloc_size =
+        std::max(size, current_buffer ? (current_buffer->size() * 2) : 
kMinAllocSize);
+
+    // Attempt to allocate the block.
+    uint8_t* data = nullptr;
+    auto result = AllocateImmutableZeros(alloc_size, &data);
+
+    // If we fail to do so, fall back to trying to allocate the requested size
+    // exactly as a last-ditch effort.
+    if (!result.ok() || data == nullptr) {

Review comment:
       You shouldn't be able to in the default implementation, but 
`AllocateImmutableZeros` is virtual, so someone could override it in a custom 
implementation. Still, I suppose I should have gotten rid of that. As a first 
pass I had it throw an error properly when data is set to null or not modified, 
then changed my mind and did a `DCHECK` instead. Now the null check there 
indeed doesn't really do anything useful anymore. 3dda474

##########
File path: cpp/src/arrow/memory_pool.cc
##########
@@ -603,14 +643,116 @@ class BaseMemoryPoolImpl : public MemoryPool {
     stats_.UpdateAllocatedBytes(-size);
   }
 
-  void ReleaseUnused() override { Allocator::ReleaseUnused(); }
+ protected:
+  virtual Status AllocateImmutableZeros(int64_t size, uint8_t** out) {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      *out = static_cast<uint8_t*>(mmap(
+          nullptr, size, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS | 
MAP_NORESERVE, -1, 0));
+      if (*out == MAP_FAILED) {
+        auto err = errno;
+        return Status::OutOfMemory("Failed to allocate zero buffer of size ", 
size, ": ",
+                                   strerror(err));
+      }
+      return Status::OK();
+    }
+#endif
+    // TODO: jemalloc and mimalloc support zero-initialized allocations as
+    //  well, which might be faster than allocate + memset.
+    RETURN_NOT_OK(Allocate(size, out));
+    std::memset(*out, 0, size);
+    return Status::OK();
+  }
+
+  void FreeImmutableZeros(uint8_t* buffer, int64_t size) override {
+#ifdef USE_MMAP_FOR_IMMUTABLE_ZEROS
+    if (size > 0) {
+      munmap(buffer, size);
+      return;
+    }
+#endif
+    Free(buffer, size);
+  }
+
+ public:
+  Result<std::shared_ptr<Buffer>> GetImmutableZeros(int64_t size) override {
+    // Thread-safely get the current largest buffer of zeros.
+    auto current_buffer = atomic_load(&immutable_zeros_cache_);
+
+    // If this buffer satisfies the requirements, return it.
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Acquire the lock for allocating a new buffer.
+    std::lock_guard<std::mutex> gg(immutable_zeros_mutex_);
+
+    // Between our previous atomic load and acquisition of the lock, another
+    // thread may have allocated a buffer. So we need to check again.
+    current_buffer = atomic_load(&immutable_zeros_cache_);
+    if (current_buffer && current_buffer->size() >= size) {
+      return std::move(current_buffer);
+    }
+
+    // Let's now figure out a good size to allocate. This is done
+    // heuristically, with the following rules:
+    //  - allocate at least the requested size (obviously);
+    //  - allocate at least 2x the previous size;
+    //  - allocate at least kMinAllocSize bytes (to avoid lots of small
+    //    allocations).
+    static const int64_t kMinAllocSize = 4096;
+    int64_t alloc_size =
+        std::max(size, current_buffer ? (current_buffer->size() * 2) : 
kMinAllocSize);
+
+    // Attempt to allocate the block.
+    uint8_t* data = nullptr;
+    auto result = AllocateImmutableZeros(alloc_size, &data);
+
+    // If we fail to do so, fall back to trying to allocate the requested size
+    // exactly as a last-ditch effort.
+    if (!result.ok() || data == nullptr) {
+      alloc_size = size;
+      RETURN_NOT_OK(AllocateImmutableZeros(alloc_size, &data));
+    }
+    DCHECK_NE(data, nullptr);
+
+    // Move ownership of the data block into an ImmutableZeros object. It will
+    // free the block when destroyed, i.e. when all shared_ptr references to it
+    // are reset or go out of scope.
+    current_buffer = std::make_shared<ImmutableZeros>(data, alloc_size, this);
+
+    // Store a reference to the new block in the cache, so subsequent calls to
+    // this function (from this thread or from other threads) can use it, too.
+    atomic_store(&immutable_zeros_cache_, current_buffer);
+
+    return std::move(current_buffer);
+  }
+
+  void ReleaseUnused() override {
+    // Get rid of the ImmutableZeros cache if we're the only one using it. If
+    // there are other pieces of code using it, getting rid of the cache won't
+    // deallocate it anyway, so it's better to hold onto it.
+    {
+      auto cache = atomic_load(&immutable_zeros_cache_);
+
+      // Because we now have a copy in our thread, the use count will be 2 if
+      // nothing else is using it.
+      if (cache.use_count() <= 2) {
+        atomic_store(&immutable_zeros_cache_, 
std::shared_ptr<ImmutableZeros>());

Review comment:
       It's not entirely thread-safe in the sense that if other threads are 
doing stuff with the cache while `ReleaseUnused()` is called, `ReleaseUnused()` 
may or may not flush the cache due to a data race in `use_count`. But it 
doesn't break anything in either case, and `ReleaseUnused()` is already 
documented to be best-effort.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to