This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 53858ec3d [GLUTEN-5630][VL] Decrease peak memory by taking freeBytes
into account (#5635)
53858ec3d is described below
commit 53858ec3ded3cee2e675a1f729bee37718b57dd1
Author: Yang Zhang <[email protected]>
AuthorDate: Thu May 9 13:45:17 2024 +0800
[GLUTEN-5630][VL] Decrease peak memory by taking freeBytes into account
(#5635)
---
cpp/core/jni/JniCommon.h | 18 +++++++++-----
cpp/core/memory/AllocationListener.h | 8 +++++++
cpp/core/memory/HbwAllocator.cc | 4 ++++
cpp/core/memory/HbwAllocator.h | 2 ++
cpp/core/memory/MemoryAllocator.cc | 13 ++++++++++
cpp/core/memory/MemoryAllocator.h | 7 ++++++
cpp/velox/memory/VeloxMemoryManager.cc | 44 ++++++++++++++++++++++------------
7 files changed, 75 insertions(+), 21 deletions(-)
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index bda5fc1df..29c38689c 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -339,6 +339,14 @@ class SparkAllocationListener final : public
gluten::AllocationListener {
updateReservation(size);
}
+ int64_t currentBytes() override {
+ return bytesReserved_;
+ }
+
+ int64_t peakBytes() override {
+ return maxBytesReserved_;
+ }
+
private:
int64_t reserve(int64_t diff) {
std::lock_guard<std::mutex> lock(mutex_);
@@ -352,9 +360,7 @@ class SparkAllocationListener final : public
gluten::AllocationListener {
}
int64_t bytesGranted = (newBlockCount - blocksReserved_) * blockSize_;
blocksReserved_ = newBlockCount;
- if (bytesReserved_ > maxBytesReserved_) {
- maxBytesReserved_ = bytesReserved_;
- }
+ maxBytesReserved_ = std::max(maxBytesReserved_, bytesReserved_);
return bytesGranted;
}
@@ -368,10 +374,10 @@ class SparkAllocationListener final : public
gluten::AllocationListener {
if (granted < 0) {
env->CallLongMethod(jListenerGlobalRef_, jUnreserveMethod_, -granted);
checkException(env);
- return;
+ } else {
+ env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, granted);
+ checkException(env);
}
- env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, granted);
- checkException(env);
}
JavaVM* vm_;
diff --git a/cpp/core/memory/AllocationListener.h
b/cpp/core/memory/AllocationListener.h
index 04290c8c3..23015e1a0 100644
--- a/cpp/core/memory/AllocationListener.h
+++ b/cpp/core/memory/AllocationListener.h
@@ -32,6 +32,14 @@ class AllocationListener {
// Value of diff can be either positive or negative
virtual void allocationChanged(int64_t diff) = 0;
+ virtual int64_t currentBytes() {
+ return 0;
+ }
+
+ virtual int64_t peakBytes() {
+ return 0;
+ }
+
protected:
AllocationListener() = default;
};
diff --git a/cpp/core/memory/HbwAllocator.cc b/cpp/core/memory/HbwAllocator.cc
index 9f2c5d6b6..ef0dc82b8 100644
--- a/cpp/core/memory/HbwAllocator.cc
+++ b/cpp/core/memory/HbwAllocator.cc
@@ -85,4 +85,8 @@ int64_t HbwMemoryAllocator::getBytes() const {
return bytes_;
}
+int64_t HbwMemoryAllocator::peakBytes() const {
+ return 0;
+}
+
} // namespace gluten
diff --git a/cpp/core/memory/HbwAllocator.h b/cpp/core/memory/HbwAllocator.h
index 461f25682..e50a71bd5 100644
--- a/cpp/core/memory/HbwAllocator.h
+++ b/cpp/core/memory/HbwAllocator.h
@@ -39,6 +39,8 @@ class HbwMemoryAllocator final : public MemoryAllocator {
int64_t getBytes() const override;
+ int64_t peakBytes() const override;
+
private:
std::atomic_int64_t bytes_{0};
};
diff --git a/cpp/core/memory/MemoryAllocator.cc
b/cpp/core/memory/MemoryAllocator.cc
index 218534cd8..6bcb9926e 100644
--- a/cpp/core/memory/MemoryAllocator.cc
+++ b/cpp/core/memory/MemoryAllocator.cc
@@ -29,6 +29,7 @@ bool ListenableMemoryAllocator::allocate(int64_t size, void**
out) {
}
if (succeed) {
bytes_ += size;
+ peakBytes_ = std::max(peakBytes_, bytes_.load());
}
return succeed;
}
@@ -41,6 +42,7 @@ bool ListenableMemoryAllocator::allocateZeroFilled(int64_t
nmemb, int64_t size,
}
if (succeed) {
bytes_ += size * nmemb;
+ peakBytes_ = std::max(peakBytes_, bytes_.load());
}
return succeed;
}
@@ -53,6 +55,7 @@ bool ListenableMemoryAllocator::allocateAligned(uint64_t
alignment, int64_t size
}
if (succeed) {
bytes_ += size;
+ peakBytes_ = std::max(peakBytes_, bytes_.load());
}
return succeed;
}
@@ -66,6 +69,7 @@ bool ListenableMemoryAllocator::reallocate(void* p, int64_t
size, int64_t newSiz
}
if (succeed) {
bytes_ += diff;
+ peakBytes_ = std::max(peakBytes_, bytes_.load());
}
return succeed;
}
@@ -84,6 +88,7 @@ bool ListenableMemoryAllocator::reallocateAligned(
}
if (succeed) {
bytes_ += diff;
+ peakBytes_ = std::max(peakBytes_, bytes_.load());
}
return succeed;
}
@@ -104,6 +109,10 @@ int64_t ListenableMemoryAllocator::getBytes() const {
return bytes_;
}
+int64_t ListenableMemoryAllocator::peakBytes() const {
+ return peakBytes_;
+}
+
bool StdMemoryAllocator::allocate(int64_t size, void** out) {
*out = std::malloc(size);
bytes_ += size;
@@ -160,6 +169,10 @@ int64_t StdMemoryAllocator::getBytes() const {
return bytes_;
}
+int64_t StdMemoryAllocator::peakBytes() const {
+ return 0;
+}
+
std::shared_ptr<MemoryAllocator> defaultMemoryAllocator() {
#if defined(GLUTEN_ENABLE_HBM)
static std::shared_ptr<MemoryAllocator> alloc =
HbwMemoryAllocator::newInstance();
diff --git a/cpp/core/memory/MemoryAllocator.h
b/cpp/core/memory/MemoryAllocator.h
index f9d5948fb..a322c9190 100644
--- a/cpp/core/memory/MemoryAllocator.h
+++ b/cpp/core/memory/MemoryAllocator.h
@@ -41,6 +41,8 @@ class MemoryAllocator {
virtual bool free(void* p, int64_t size) = 0;
virtual int64_t getBytes() const = 0;
+
+ virtual int64_t peakBytes() const = 0;
};
class ListenableMemoryAllocator final : public MemoryAllocator {
@@ -63,10 +65,13 @@ class ListenableMemoryAllocator final : public
MemoryAllocator {
int64_t getBytes() const override;
+ int64_t peakBytes() const override;
+
private:
MemoryAllocator* delegated_;
AllocationListener* listener_;
std::atomic_int64_t bytes_{0};
+ int64_t peakBytes_{0};
};
class StdMemoryAllocator final : public MemoryAllocator {
@@ -85,6 +90,8 @@ class StdMemoryAllocator final : public MemoryAllocator {
int64_t getBytes() const override;
+ int64_t peakBytes() const override;
+
private:
std::atomic_int64_t bytes_{0};
};
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index 93eb93f6b..49edba4c4 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -40,11 +40,8 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
}
uint64_t growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes)
override {
- if (targetBytes == 0) {
- return 0;
- }
- std::lock_guard<std::recursive_mutex> l(mutex_);
- return growPoolLocked(pool, targetBytes);
+ VELOX_CHECK_EQ(targetBytes, 0, "Gluten has set
MemoryManagerOptions.memoryPoolInitCapacity to 0")
+ return 0;
}
uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t
targetBytes) override {
@@ -56,12 +53,11 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
velox::memory::MemoryPool* pool,
const std::vector<std::shared_ptr<velox::memory::MemoryPool>>&
candidatePools,
uint64_t targetBytes) override {
- GLUTEN_CHECK(candidatePools.size() == 1, "ListenableArbitrator should only
be used within a single root pool");
+ VELOX_CHECK_EQ(candidatePools.size(), 1, "ListenableArbitrator should only
be used within a single root pool")
auto candidate = candidatePools.back();
- GLUTEN_CHECK(pool->root() == candidate.get(), "Illegal state in
ListenableArbitrator");
- {
+ VELOX_CHECK(pool->root() == candidate.get(), "Illegal state in
ListenableArbitrator") {
std::lock_guard<std::recursive_mutex> l(mutex_);
- growPoolLocked(pool, targetBytes);
+ growPoolLocked(pool->root(), targetBytes);
}
return true;
}
@@ -72,7 +68,7 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
bool allowSpill,
bool allowAbort) override {
facebook::velox::exec::MemoryReclaimer::Stats status;
- GLUTEN_CHECK(pools.size() == 1, "Should shrink a single pool at a time");
+ VELOX_CHECK_EQ(pools.size(), 1, "Gluten only has one root pool");
std::lock_guard<std::recursive_mutex> l(mutex_); // FIXME: Do we have
recursive locking for this mutex?
auto pool = pools.at(0);
const uint64_t oldCapacity = pool->capacity();
@@ -107,8 +103,12 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
" bytes although there is enough space, free bytes: " +
std::to_string(freeBytes));
return 0;
}
- listener_->allocationChanged(bytes);
- return pool->grow(bytes, bytes);
+ auto reclaimedFreeBytes = pool->shrink(0);
+ auto neededBytes = bytes - reclaimedFreeBytes;
+ listener_->allocationChanged(neededBytes);
+ auto ret = pool->grow(bytes, bytes);
+ VELOX_CHECK(ret, "{} failed to grow {} bytes", pool->name(),
velox::succinctBytes(bytes))
+ return ret;
}
uint64_t releaseMemoryLocked(velox::memory::MemoryPool* pool, uint64_t
bytes) {
@@ -179,18 +179,25 @@ VeloxMemoryManager::VeloxMemoryManager(
}
namespace {
-MemoryUsageStats collectMemoryUsageStatsInternal(const
velox::memory::MemoryPool* pool) {
+MemoryUsageStats collectVeloxMemoryUsageStats(const velox::memory::MemoryPool*
pool) {
MemoryUsageStats stats;
stats.set_current(pool->currentBytes());
stats.set_peak(pool->peakBytes());
// walk down root and all children
pool->visitChildren([&](velox::memory::MemoryPool* pool) -> bool {
- stats.mutable_children()->emplace(pool->name(),
collectMemoryUsageStatsInternal(pool));
+ stats.mutable_children()->emplace(pool->name(),
collectVeloxMemoryUsageStats(pool));
return true;
});
return stats;
}
+MemoryUsageStats collectGlutenAllocatorMemoryUsageStats(const MemoryAllocator*
allocator) {
+ MemoryUsageStats stats;
+ stats.set_current(allocator->getBytes());
+ stats.set_peak(allocator->peakBytes());
+ return stats;
+}
+
int64_t shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm,
velox::memory::MemoryPool* pool, int64_t size) {
std::string poolName{pool->root()->name() + "/" + pool->name()};
std::string logPrefix{"Shrink[" + poolName + "]: "};
@@ -208,7 +215,14 @@ int64_t
shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::M
} // namespace
const MemoryUsageStats VeloxMemoryManager::collectMemoryUsageStats() const {
- return collectMemoryUsageStatsInternal(veloxAggregatePool_.get());
+ MemoryUsageStats stats;
+ stats.set_current(listener_->currentBytes());
+ stats.set_peak(listener_->peakBytes());
+ stats.mutable_children()->emplace(
+ "gluten::MemoryAllocator",
collectGlutenAllocatorMemoryUsageStats(glutenAlloc_.get()));
+ stats.mutable_children()->emplace(
+ veloxAggregatePool_->name(),
collectVeloxMemoryUsageStats(veloxAggregatePool_.get()));
+ return stats;
}
const int64_t VeloxMemoryManager::shrink(int64_t size) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]