This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ec62a76c8 [VL] Daily update velox version 08-08 (#6752)
ec62a76c8 is described below

commit ec62a76c88b264262795577fc60fa34f328bb33e
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu Aug 8 22:40:37 2024 +0800

    [VL] Daily update velox version 08-08 (#6752)
---
 cpp/velox/compute/WholeStageResultIterator.cc |  2 +-
 cpp/velox/memory/VeloxMemoryManager.cc        | 88 ++++++++++++++++++---------
 ep/build-velox/src/get_velox.sh               |  2 +-
 3 files changed, 62 insertions(+), 30 deletions(-)

diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 4544c0165..eb700c648 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -245,7 +245,7 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t 
size) {
     SuspendedSection suspender;
     velox::exec::MemoryReclaimer::Stats status;
     auto* mm = memoryManager_->getMemoryManager();
-    uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); 
// this conducts spilling
+    uint64_t spilledOut = mm->arbitrator()->shrinkCapacity(remaining); // this 
conducts spilling
     LOG(INFO) << logPrefix << "Successfully spilled out " << spilledOut << " 
bytes.";
     uint64_t total = shrunken + spilledOut;
     VLOG(2) << logPrefix << "Successfully reclaimed total " << total << " 
bytes.";
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 442090004..3f30d8627 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -35,63 +35,91 @@ namespace gluten {
 
 using namespace facebook;
 
+namespace {
+
+static constexpr std::string_view 
kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
+static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
+static constexpr std::string_view 
kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
+static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
+
+template <typename T>
+T getConfig(
+    const std::unordered_map<std::string, std::string>& configs,
+    const std::string_view& key,
+    const T& defaultValue) {
+  if (configs.count(std::string(key)) > 0) {
+    try {
+      return folly::to<T>(configs.at(std::string(key)));
+    } catch (const std::exception& e) {
+      VELOX_USER_FAIL("Failed while parsing SharedArbitrator configs: {}", 
e.what());
+    }
+  }
+  return defaultValue;
+}
+} // namespace
 /// We assume in a single Spark task. No thread-safety should be guaranteed.
 class ListenableArbitrator : public velox::memory::MemoryArbitrator {
  public:
   ListenableArbitrator(const Config& config, AllocationListener* listener)
-      : MemoryArbitrator(config), listener_(listener) {}
-
+      : MemoryArbitrator(config),
+        listener_(listener),
+        memoryPoolInitialCapacity_(
+            getConfig<uint64_t>(config.extraConfigs, 
kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity)),
+        memoryPoolTransferCapacity_(
+            getConfig<uint64_t>(config.extraConfigs, 
kMemoryPoolTransferCapacity, kDefaultMemoryPoolTransferCapacity)) {
+  }
   std::string kind() const override {
     return kind_;
   }
 
-  uint64_t growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) 
override {
-    std::lock_guard<std::recursive_mutex> l(mutex_);
-    listener_->allocationChanged(targetBytes);
-    if (!growPool(pool, targetBytes, 0)) {
-      VELOX_FAIL("Failed to grow root pool's capacity for {}", 
velox::succinctBytes(targetBytes));
-    }
-    return targetBytes;
+  void addPool(const std::shared_ptr<velox::memory::MemoryPool>& pool) 
override {
+    VELOX_CHECK_EQ(pool->capacity(), 0);
+
+    std::unique_lock guard{mutex_};
+    VELOX_CHECK_EQ(candidates_.count(pool.get()), 0);
+    candidates_.emplace(pool.get(), pool->weak_from_this());
   }
 
-  uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t 
targetBytes) override {
-    std::lock_guard<std::recursive_mutex> l(mutex_);
-    return shrinkCapacityLocked(pool, targetBytes);
+  void removePool(velox::memory::MemoryPool* pool) override {
+    VELOX_CHECK_EQ(pool->reservedBytes(), 0);
+    shrinkCapacity(pool, pool->capacity());
+
+    std::unique_lock guard{mutex_};
+    const auto ret = candidates_.erase(pool);
+    VELOX_CHECK_EQ(ret, 1);
   }
 
-  bool growCapacity(
-      velox::memory::MemoryPool* pool,
-      const std::vector<std::shared_ptr<velox::memory::MemoryPool>>& 
candidatePools,
-      uint64_t targetBytes) override {
+  bool growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) 
override {
     velox::memory::ScopedMemoryArbitrationContext ctx(pool);
-    VELOX_CHECK_EQ(candidatePools.size(), 1, "ListenableArbitrator should only 
be used within a single root pool")
-    auto candidate = candidatePools.back();
-    VELOX_CHECK(pool->root() == candidate.get(), "Illegal state in 
ListenableArbitrator");
+    VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be 
used within a single root pool")
+    auto candidate = candidates_.begin()->first;
+    VELOX_CHECK(pool->root() == candidate, "Illegal state in 
ListenableArbitrator");
 
     std::lock_guard<std::recursive_mutex> l(mutex_);
     growCapacityLocked(pool->root(), targetBytes);
     return true;
   }
 
-  uint64_t shrinkCapacity(
-      const std::vector<std::shared_ptr<velox::memory::MemoryPool>>& pools,
-      uint64_t targetBytes,
-      bool allowSpill,
-      bool allowAbort) override {
+  uint64_t shrinkCapacity(uint64_t targetBytes, bool allowSpill, bool 
allowAbort) override {
     velox::memory::ScopedMemoryArbitrationContext ctx((const 
velox::memory::MemoryPool*)nullptr);
     facebook::velox::exec::MemoryReclaimer::Stats status;
-    VELOX_CHECK_EQ(pools.size(), 1, "Gluten only has one root pool");
+    VELOX_CHECK_EQ(candidates_.size(), 1, "Gluten only has one root pool");
     std::lock_guard<std::recursive_mutex> l(mutex_); // FIXME: Do we have 
recursive locking for this mutex?
-    auto pool = pools.at(0);
+    auto pool = candidates_.begin()->first;
     const uint64_t oldCapacity = pool->capacity();
     pool->reclaim(targetBytes, 0, status); // ignore the output
-    shrinkPool(pool.get(), 0);
+    shrinkPool(pool, 0);
     const uint64_t newCapacity = pool->capacity();
     uint64_t total = oldCapacity - newCapacity;
     listener_->allocationChanged(-total);
     return total;
   }
 
+  uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t 
targetBytes) override {
+    std::lock_guard<std::recursive_mutex> l(mutex_);
+    return shrinkCapacityLocked(pool, targetBytes);
+  }
+
   Stats stats() const override {
     Stats stats; // no-op
     return stats;
@@ -131,8 +159,12 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
   }
 
   gluten::AllocationListener* listener_;
-  std::recursive_mutex mutex_;
+  const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused.
+  const uint64_t memoryPoolTransferCapacity_;
+
+  mutable std::recursive_mutex mutex_;
   inline static std::string kind_ = "GLUTEN";
+  std::unordered_map<velox::memory::MemoryPool*, 
std::weak_ptr<velox::memory::MemoryPool>> candidates_;
 };
 
 class ArbitratorFactoryRegister {
diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh
index 60d87bfb1..79a1d84fa 100755
--- a/ep/build-velox/src/get_velox.sh
+++ b/ep/build-velox/src/get_velox.sh
@@ -17,7 +17,7 @@
 set -exu
 
 VELOX_REPO=https://github.com/oap-project/velox.git
-VELOX_BRANCH=2024_08_06
+VELOX_BRANCH=2024_08_08
 VELOX_HOME=""
 
 OS=`uname -s`


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to