This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 3349e8d965 [GLUTEN-11485][VL] Fix the race condition in 
ArrowMemoryPool (#11493)
3349e8d965 is described below

commit 3349e8d9653cbea056b2dbb5ff3fb721a787d60f
Author: Rong Ma <[email protected]>
AuthorDate: Thu Jan 29 01:58:34 2026 +0000

    [GLUTEN-11485][VL] Fix the race condition in ArrowMemoryPool (#11493)
    
    * fix and simplify
    
    * fix
---
 cpp/core/memory/ArrowMemoryPool.cc     |  6 ------
 cpp/core/memory/ArrowMemoryPool.h      |  9 +++------
 cpp/velox/memory/VeloxMemoryManager.cc | 28 +++++++++++++---------------
 cpp/velox/memory/VeloxMemoryManager.h  |  2 --
 4 files changed, 16 insertions(+), 29 deletions(-)

diff --git a/cpp/core/memory/ArrowMemoryPool.cc 
b/cpp/core/memory/ArrowMemoryPool.cc
index 83a511cf8f..41640fc6e0 100644
--- a/cpp/core/memory/ArrowMemoryPool.cc
+++ b/cpp/core/memory/ArrowMemoryPool.cc
@@ -20,12 +20,6 @@
 
 namespace gluten {
 
-ArrowMemoryPool::~ArrowMemoryPool() {
-  if (releaser_ != nullptr) {
-    releaser_(this);
-  }
-}
-
 arrow::Status ArrowMemoryPool::Allocate(int64_t size, int64_t alignment, 
uint8_t** out) {
   if (!allocator_->allocateAligned(alignment, size, 
reinterpret_cast<void**>(out))) {
     return arrow::Status::Invalid("WrappedMemoryPool: Error allocating " + 
std::to_string(size) + " bytes");
diff --git a/cpp/core/memory/ArrowMemoryPool.h 
b/cpp/core/memory/ArrowMemoryPool.h
index 841244ec6b..8cf5c71f0e 100644
--- a/cpp/core/memory/ArrowMemoryPool.h
+++ b/cpp/core/memory/ArrowMemoryPool.h
@@ -30,11 +30,10 @@ using ArrowMemoryPoolReleaser = 
std::function<void(arrow::MemoryPool*)>;
 class ArrowMemoryPool final : public arrow::MemoryPool {
  public:
   // NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init, hicpp-member-init)
-  explicit ArrowMemoryPool(AllocationListener* listener, 
ArrowMemoryPoolReleaser releaser = nullptr)
-      : 
allocator_(std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(),
 listener)),
-        releaser_(std::move(releaser)) {}
+  explicit ArrowMemoryPool(AllocationListener* listener)
+      : 
allocator_(std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(),
 listener)) {}
 
-  ~ArrowMemoryPool() override;
+  ~ArrowMemoryPool() override = default;
 
   ArrowMemoryPool(const ArrowMemoryPool&) = delete;
 
@@ -64,8 +63,6 @@ class ArrowMemoryPool final : public arrow::MemoryPool {
 
  private:
   std::unique_ptr<MemoryAllocator> allocator_ = nullptr;
-
-  ArrowMemoryPoolReleaser releaser_;
 };
 
 } // namespace gluten
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 54e442dfe2..d829516e0d 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -294,10 +294,13 @@ MemoryUsageStats collectGlutenAllocatorMemoryUsageStats(
   return stats;
 }
 
-void logMemoryUsageStats(MemoryUsageStats stats, const std::string& name, 
const std::string& logPrefix, std::stringstream& ss) {
-  ss << logPrefix << "+- " << name
-     << " (used: " << velox::succinctBytes(stats.current())
-     << ", peak: " <<  velox::succinctBytes(stats.peak()) << ")\n";
+void logMemoryUsageStats(
+    MemoryUsageStats stats,
+    const std::string& name,
+    const std::string& logPrefix,
+    std::stringstream& ss) {
+  ss << logPrefix << "+- " << name << " (used: " << 
velox::succinctBytes(stats.current())
+     << ", peak: " << velox::succinctBytes(stats.peak()) << ")\n";
   if (stats.children_size() > 0) {
     for (auto it = stats.children().begin(); it != stats.children().end(); 
++it) {
       logMemoryUsageStats(it->second, it->first, logPrefix + "   ", ss);
@@ -327,22 +330,17 @@ int64_t 
shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::M
 std::shared_ptr<arrow::MemoryPool> 
VeloxMemoryManager::getOrCreateArrowMemoryPool(const std::string& name) {
   std::lock_guard<std::mutex> l(mutex_);
   if (const auto it = arrowPools_.find(name); it != arrowPools_.end()) {
-    auto pool = it->second.lock();
-    VELOX_CHECK_NOT_NULL(pool, "Arrow memory pool {} has been destructed", 
name);
-    return pool;
+    if (auto pool = it->second.lock()) {
+      return pool;
+    }
+    arrowPools_.erase(name);
   }
-  auto pool = std::make_shared<ArrowMemoryPool>(
-      blockListener_.get(), [this, name](arrow::MemoryPool* pool) { 
this->dropMemoryPool(name); });
+
+  auto pool = std::make_shared<ArrowMemoryPool>(blockListener_.get());
   arrowPools_.emplace(name, pool);
   return pool;
 }
 
-void VeloxMemoryManager::dropMemoryPool(const std::string& name) {
-  std::lock_guard<std::mutex> l(mutex_);
-  const auto ret = arrowPools_.erase(name);
-  VELOX_CHECK_EQ(ret, 1, "Child memory pool {} doesn't exist", name);
-}
-
 const MemoryUsageStats VeloxMemoryManager::collectMemoryUsageStats() const {
   MemoryUsageStats stats;
   stats.set_current(listener_->currentBytes());
diff --git a/cpp/velox/memory/VeloxMemoryManager.h 
b/cpp/velox/memory/VeloxMemoryManager.h
index 570d1ec43a..9d4b9e9a9b 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -103,8 +103,6 @@ class VeloxMemoryManager final : public MemoryManager {
  private:
   bool tryDestructSafe();
 
-  void dropMemoryPool(const std::string& name);
-
   std::unique_ptr<AllocationListener> listener_;
   std::unique_ptr<AllocationListener> blockListener_;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to