This is an automated email from the ASF dual-hosted git repository.

yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 53858ec3d [GLUTEN-5630][VL] Decrease peak memory by taking freeBytes 
into account  (#5635)
53858ec3d is described below

commit 53858ec3ded3cee2e675a1f729bee37718b57dd1
Author: Yang Zhang <[email protected]>
AuthorDate: Thu May 9 13:45:17 2024 +0800

    [GLUTEN-5630][VL] Decrease peak memory by taking freeBytes into account  
(#5635)
---
 cpp/core/jni/JniCommon.h               | 18 +++++++++-----
 cpp/core/memory/AllocationListener.h   |  8 +++++++
 cpp/core/memory/HbwAllocator.cc        |  4 ++++
 cpp/core/memory/HbwAllocator.h         |  2 ++
 cpp/core/memory/MemoryAllocator.cc     | 13 ++++++++++
 cpp/core/memory/MemoryAllocator.h      |  7 ++++++
 cpp/velox/memory/VeloxMemoryManager.cc | 44 ++++++++++++++++++++++------------
 7 files changed, 75 insertions(+), 21 deletions(-)

diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index bda5fc1df..29c38689c 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -339,6 +339,14 @@ class SparkAllocationListener final : public 
gluten::AllocationListener {
     updateReservation(size);
   }
 
+  int64_t currentBytes() override {
+    return bytesReserved_;
+  }
+
+  int64_t peakBytes() override {
+    return maxBytesReserved_;
+  }
+
  private:
   int64_t reserve(int64_t diff) {
     std::lock_guard<std::mutex> lock(mutex_);
@@ -352,9 +360,7 @@ class SparkAllocationListener final : public 
gluten::AllocationListener {
     }
     int64_t bytesGranted = (newBlockCount - blocksReserved_) * blockSize_;
     blocksReserved_ = newBlockCount;
-    if (bytesReserved_ > maxBytesReserved_) {
-      maxBytesReserved_ = bytesReserved_;
-    }
+    maxBytesReserved_ = std::max(maxBytesReserved_, bytesReserved_);
     return bytesGranted;
   }
 
@@ -368,10 +374,10 @@ class SparkAllocationListener final : public 
gluten::AllocationListener {
     if (granted < 0) {
       env->CallLongMethod(jListenerGlobalRef_, jUnreserveMethod_, -granted);
       checkException(env);
-      return;
+    } else {
+      env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, granted);
+      checkException(env);
     }
-    env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, granted);
-    checkException(env);
   }
 
   JavaVM* vm_;
diff --git a/cpp/core/memory/AllocationListener.h 
b/cpp/core/memory/AllocationListener.h
index 04290c8c3..23015e1a0 100644
--- a/cpp/core/memory/AllocationListener.h
+++ b/cpp/core/memory/AllocationListener.h
@@ -32,6 +32,14 @@ class AllocationListener {
   // Value of diff can be either positive or negative
   virtual void allocationChanged(int64_t diff) = 0;
 
+  virtual int64_t currentBytes() {
+    return 0;
+  }
+
+  virtual int64_t peakBytes() {
+    return 0;
+  }
+
  protected:
   AllocationListener() = default;
 };
diff --git a/cpp/core/memory/HbwAllocator.cc b/cpp/core/memory/HbwAllocator.cc
index 9f2c5d6b6..ef0dc82b8 100644
--- a/cpp/core/memory/HbwAllocator.cc
+++ b/cpp/core/memory/HbwAllocator.cc
@@ -85,4 +85,8 @@ int64_t HbwMemoryAllocator::getBytes() const {
   return bytes_;
 }
 
+int64_t HbwMemoryAllocator::peakBytes() const {
+  return 0;
+}
+
 } // namespace gluten
diff --git a/cpp/core/memory/HbwAllocator.h b/cpp/core/memory/HbwAllocator.h
index 461f25682..e50a71bd5 100644
--- a/cpp/core/memory/HbwAllocator.h
+++ b/cpp/core/memory/HbwAllocator.h
@@ -39,6 +39,8 @@ class HbwMemoryAllocator final : public MemoryAllocator {
 
   int64_t getBytes() const override;
 
+  int64_t peakBytes() const override;
+
  private:
   std::atomic_int64_t bytes_{0};
 };
diff --git a/cpp/core/memory/MemoryAllocator.cc 
b/cpp/core/memory/MemoryAllocator.cc
index 218534cd8..6bcb9926e 100644
--- a/cpp/core/memory/MemoryAllocator.cc
+++ b/cpp/core/memory/MemoryAllocator.cc
@@ -29,6 +29,7 @@ bool ListenableMemoryAllocator::allocate(int64_t size, void** 
out) {
   }
   if (succeed) {
     bytes_ += size;
+    peakBytes_ = std::max(peakBytes_, bytes_.load());
   }
   return succeed;
 }
@@ -41,6 +42,7 @@ bool ListenableMemoryAllocator::allocateZeroFilled(int64_t 
nmemb, int64_t size,
   }
   if (succeed) {
     bytes_ += size * nmemb;
+    peakBytes_ = std::max(peakBytes_, bytes_.load());
   }
   return succeed;
 }
@@ -53,6 +55,7 @@ bool ListenableMemoryAllocator::allocateAligned(uint64_t 
alignment, int64_t size
   }
   if (succeed) {
     bytes_ += size;
+    peakBytes_ = std::max(peakBytes_, bytes_.load());
   }
   return succeed;
 }
@@ -66,6 +69,7 @@ bool ListenableMemoryAllocator::reallocate(void* p, int64_t 
size, int64_t newSiz
   }
   if (succeed) {
     bytes_ += diff;
+    peakBytes_ = std::max(peakBytes_, bytes_.load());
   }
   return succeed;
 }
@@ -84,6 +88,7 @@ bool ListenableMemoryAllocator::reallocateAligned(
   }
   if (succeed) {
     bytes_ += diff;
+    peakBytes_ = std::max(peakBytes_, bytes_.load());
   }
   return succeed;
 }
@@ -104,6 +109,10 @@ int64_t ListenableMemoryAllocator::getBytes() const {
   return bytes_;
 }
 
+int64_t ListenableMemoryAllocator::peakBytes() const {
+  return peakBytes_;
+}
+
 bool StdMemoryAllocator::allocate(int64_t size, void** out) {
   *out = std::malloc(size);
   bytes_ += size;
@@ -160,6 +169,10 @@ int64_t StdMemoryAllocator::getBytes() const {
   return bytes_;
 }
 
+int64_t StdMemoryAllocator::peakBytes() const {
+  return 0;
+}
+
 std::shared_ptr<MemoryAllocator> defaultMemoryAllocator() {
 #if defined(GLUTEN_ENABLE_HBM)
   static std::shared_ptr<MemoryAllocator> alloc = 
HbwMemoryAllocator::newInstance();
diff --git a/cpp/core/memory/MemoryAllocator.h 
b/cpp/core/memory/MemoryAllocator.h
index f9d5948fb..a322c9190 100644
--- a/cpp/core/memory/MemoryAllocator.h
+++ b/cpp/core/memory/MemoryAllocator.h
@@ -41,6 +41,8 @@ class MemoryAllocator {
   virtual bool free(void* p, int64_t size) = 0;
 
   virtual int64_t getBytes() const = 0;
+
+  virtual int64_t peakBytes() const = 0;
 };
 
 class ListenableMemoryAllocator final : public MemoryAllocator {
@@ -63,10 +65,13 @@ class ListenableMemoryAllocator final : public 
MemoryAllocator {
 
   int64_t getBytes() const override;
 
+  int64_t peakBytes() const override;
+
  private:
   MemoryAllocator* delegated_;
   AllocationListener* listener_;
   std::atomic_int64_t bytes_{0};
+  int64_t peakBytes_{0};
 };
 
 class StdMemoryAllocator final : public MemoryAllocator {
@@ -85,6 +90,8 @@ class StdMemoryAllocator final : public MemoryAllocator {
 
   int64_t getBytes() const override;
 
+  int64_t peakBytes() const override;
+
  private:
   std::atomic_int64_t bytes_{0};
 };
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 93eb93f6b..49edba4c4 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -40,11 +40,8 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
   }
 
   uint64_t growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) 
override {
-    if (targetBytes == 0) {
-      return 0;
-    }
-    std::lock_guard<std::recursive_mutex> l(mutex_);
-    return growPoolLocked(pool, targetBytes);
+    VELOX_CHECK_EQ(targetBytes, 0, "Gluten has set 
MemoryManagerOptions.memoryPoolInitCapacity to 0")
+    return 0;
   }
 
   uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t 
targetBytes) override {
@@ -56,12 +53,11 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
       velox::memory::MemoryPool* pool,
       const std::vector<std::shared_ptr<velox::memory::MemoryPool>>& 
candidatePools,
       uint64_t targetBytes) override {
-    GLUTEN_CHECK(candidatePools.size() == 1, "ListenableArbitrator should only 
be used within a single root pool");
+    VELOX_CHECK_EQ(candidatePools.size(), 1, "ListenableArbitrator should only 
be used within a single root pool")
     auto candidate = candidatePools.back();
-    GLUTEN_CHECK(pool->root() == candidate.get(), "Illegal state in 
ListenableArbitrator");
-    {
+    VELOX_CHECK(pool->root() == candidate.get(), "Illegal state in 
ListenableArbitrator") {
       std::lock_guard<std::recursive_mutex> l(mutex_);
-      growPoolLocked(pool, targetBytes);
+      growPoolLocked(pool->root(), targetBytes);
     }
     return true;
   }
@@ -72,7 +68,7 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
       bool allowSpill,
       bool allowAbort) override {
     facebook::velox::exec::MemoryReclaimer::Stats status;
-    GLUTEN_CHECK(pools.size() == 1, "Should shrink a single pool at a time");
+    VELOX_CHECK_EQ(pools.size(), 1, "Gluten only has one root pool");
     std::lock_guard<std::recursive_mutex> l(mutex_); // FIXME: Do we have 
recursive locking for this mutex?
     auto pool = pools.at(0);
     const uint64_t oldCapacity = pool->capacity();
@@ -107,8 +103,12 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
               " bytes although there is enough space, free bytes: " + 
std::to_string(freeBytes));
       return 0;
     }
-    listener_->allocationChanged(bytes);
-    return pool->grow(bytes, bytes);
+    auto reclaimedFreeBytes = pool->shrink(0);
+    auto neededBytes = bytes - reclaimedFreeBytes;
+    listener_->allocationChanged(neededBytes);
+    auto ret = pool->grow(bytes, bytes);
+    VELOX_CHECK(ret, "{} failed to grow {} bytes", pool->name(), 
velox::succinctBytes(bytes))
+    return ret;
   }
 
   uint64_t releaseMemoryLocked(velox::memory::MemoryPool* pool, uint64_t 
bytes) {
@@ -179,18 +179,25 @@ VeloxMemoryManager::VeloxMemoryManager(
 }
 
 namespace {
-MemoryUsageStats collectMemoryUsageStatsInternal(const 
velox::memory::MemoryPool* pool) {
+MemoryUsageStats collectVeloxMemoryUsageStats(const velox::memory::MemoryPool* 
pool) {
   MemoryUsageStats stats;
   stats.set_current(pool->currentBytes());
   stats.set_peak(pool->peakBytes());
   // walk down root and all children
   pool->visitChildren([&](velox::memory::MemoryPool* pool) -> bool {
-    stats.mutable_children()->emplace(pool->name(), 
collectMemoryUsageStatsInternal(pool));
+    stats.mutable_children()->emplace(pool->name(), 
collectVeloxMemoryUsageStats(pool));
     return true;
   });
   return stats;
 }
 
+MemoryUsageStats collectGlutenAllocatorMemoryUsageStats(const MemoryAllocator* 
allocator) {
+  MemoryUsageStats stats;
+  stats.set_current(allocator->getBytes());
+  stats.set_peak(allocator->peakBytes());
+  return stats;
+}
+
 int64_t shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, 
velox::memory::MemoryPool* pool, int64_t size) {
   std::string poolName{pool->root()->name() + "/" + pool->name()};
   std::string logPrefix{"Shrink[" + poolName + "]: "};
@@ -208,7 +215,14 @@ int64_t 
shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::M
 } // namespace
 
 const MemoryUsageStats VeloxMemoryManager::collectMemoryUsageStats() const {
-  return collectMemoryUsageStatsInternal(veloxAggregatePool_.get());
+  MemoryUsageStats stats;
+  stats.set_current(listener_->currentBytes());
+  stats.set_peak(listener_->peakBytes());
+  stats.mutable_children()->emplace(
+      "gluten::MemoryAllocator", 
collectGlutenAllocatorMemoryUsageStats(glutenAlloc_.get()));
+  stats.mutable_children()->emplace(
+      veloxAggregatePool_->name(), 
collectVeloxMemoryUsageStats(veloxAggregatePool_.get()));
+  return stats;
 }
 
 const int64_t VeloxMemoryManager::shrink(int64_t size) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to