This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 3349e8d965 [GLUTEN-11485][VL] Fix the race condition in
ArrowMemoryPool (#11493)
3349e8d965 is described below
commit 3349e8d9653cbea056b2dbb5ff3fb721a787d60f
Author: Rong Ma <[email protected]>
AuthorDate: Thu Jan 29 01:58:34 2026 +0000
[GLUTEN-11485][VL] Fix the race condition in ArrowMemoryPool (#11493)
* fix and simplify
* fix
---
cpp/core/memory/ArrowMemoryPool.cc | 6 ------
cpp/core/memory/ArrowMemoryPool.h | 9 +++------
cpp/velox/memory/VeloxMemoryManager.cc | 28 +++++++++++++---------------
cpp/velox/memory/VeloxMemoryManager.h | 2 --
4 files changed, 16 insertions(+), 29 deletions(-)
diff --git a/cpp/core/memory/ArrowMemoryPool.cc
b/cpp/core/memory/ArrowMemoryPool.cc
index 83a511cf8f..41640fc6e0 100644
--- a/cpp/core/memory/ArrowMemoryPool.cc
+++ b/cpp/core/memory/ArrowMemoryPool.cc
@@ -20,12 +20,6 @@
namespace gluten {
-ArrowMemoryPool::~ArrowMemoryPool() {
- if (releaser_ != nullptr) {
- releaser_(this);
- }
-}
-
arrow::Status ArrowMemoryPool::Allocate(int64_t size, int64_t alignment,
uint8_t** out) {
if (!allocator_->allocateAligned(alignment, size,
reinterpret_cast<void**>(out))) {
return arrow::Status::Invalid("WrappedMemoryPool: Error allocating " +
std::to_string(size) + " bytes");
diff --git a/cpp/core/memory/ArrowMemoryPool.h
b/cpp/core/memory/ArrowMemoryPool.h
index 841244ec6b..8cf5c71f0e 100644
--- a/cpp/core/memory/ArrowMemoryPool.h
+++ b/cpp/core/memory/ArrowMemoryPool.h
@@ -30,11 +30,10 @@ using ArrowMemoryPoolReleaser =
std::function<void(arrow::MemoryPool*)>;
class ArrowMemoryPool final : public arrow::MemoryPool {
public:
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init, hicpp-member-init)
- explicit ArrowMemoryPool(AllocationListener* listener,
ArrowMemoryPoolReleaser releaser = nullptr)
- :
allocator_(std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(),
listener)),
- releaser_(std::move(releaser)) {}
+ explicit ArrowMemoryPool(AllocationListener* listener)
+ :
allocator_(std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(),
listener)) {}
- ~ArrowMemoryPool() override;
+ ~ArrowMemoryPool() override = default;
ArrowMemoryPool(const ArrowMemoryPool&) = delete;
@@ -64,8 +63,6 @@ class ArrowMemoryPool final : public arrow::MemoryPool {
private:
std::unique_ptr<MemoryAllocator> allocator_ = nullptr;
-
- ArrowMemoryPoolReleaser releaser_;
};
} // namespace gluten
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index 54e442dfe2..d829516e0d 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -294,10 +294,13 @@ MemoryUsageStats collectGlutenAllocatorMemoryUsageStats(
return stats;
}
-void logMemoryUsageStats(MemoryUsageStats stats, const std::string& name,
const std::string& logPrefix, std::stringstream& ss) {
- ss << logPrefix << "+- " << name
- << " (used: " << velox::succinctBytes(stats.current())
- << ", peak: " << velox::succinctBytes(stats.peak()) << ")\n";
+void logMemoryUsageStats(
+ MemoryUsageStats stats,
+ const std::string& name,
+ const std::string& logPrefix,
+ std::stringstream& ss) {
+ ss << logPrefix << "+- " << name << " (used: " <<
velox::succinctBytes(stats.current())
+ << ", peak: " << velox::succinctBytes(stats.peak()) << ")\n";
if (stats.children_size() > 0) {
for (auto it = stats.children().begin(); it != stats.children().end();
++it) {
logMemoryUsageStats(it->second, it->first, logPrefix + " ", ss);
@@ -327,22 +330,17 @@ int64_t
shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::M
std::shared_ptr<arrow::MemoryPool>
VeloxMemoryManager::getOrCreateArrowMemoryPool(const std::string& name) {
std::lock_guard<std::mutex> l(mutex_);
if (const auto it = arrowPools_.find(name); it != arrowPools_.end()) {
- auto pool = it->second.lock();
- VELOX_CHECK_NOT_NULL(pool, "Arrow memory pool {} has been destructed",
name);
- return pool;
+ if (auto pool = it->second.lock()) {
+ return pool;
+ }
+ arrowPools_.erase(name);
}
- auto pool = std::make_shared<ArrowMemoryPool>(
- blockListener_.get(), [this, name](arrow::MemoryPool* pool) {
this->dropMemoryPool(name); });
+
+ auto pool = std::make_shared<ArrowMemoryPool>(blockListener_.get());
arrowPools_.emplace(name, pool);
return pool;
}
-void VeloxMemoryManager::dropMemoryPool(const std::string& name) {
- std::lock_guard<std::mutex> l(mutex_);
- const auto ret = arrowPools_.erase(name);
- VELOX_CHECK_EQ(ret, 1, "Child memory pool {} doesn't exist", name);
-}
-
const MemoryUsageStats VeloxMemoryManager::collectMemoryUsageStats() const {
MemoryUsageStats stats;
stats.set_current(listener_->currentBytes());
diff --git a/cpp/velox/memory/VeloxMemoryManager.h
b/cpp/velox/memory/VeloxMemoryManager.h
index 570d1ec43a..9d4b9e9a9b 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -103,8 +103,6 @@ class VeloxMemoryManager final : public MemoryManager {
private:
bool tryDestructSafe();
- void dropMemoryPool(const std::string& name);
-
std::unique_ptr<AllocationListener> listener_;
std::unique_ptr<AllocationListener> blockListener_;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]