This is an automated email from the ASF dual-hosted git repository.

yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d796e08db0 [GLUTEN-7800][VL] Add config for max reclaim wait time to 
avoid dead lock when memory arbitration (#7990)
d796e08db0 is described below

commit d796e08db0744417a98b0ecba27f34d118450682
Author: Yang Zhang <[email protected]>
AuthorDate: Wed Nov 20 11:53:04 2024 +0800

    [GLUTEN-7800][VL] Add config for max reclaim wait time to avoid dead lock 
when memory arbitration (#7990)
---
 cpp/velox/config/VeloxConfig.h                      |  3 +++
 cpp/velox/memory/VeloxMemoryManager.cc              | 21 ++++++++++++++++-----
 .../main/scala/org/apache/gluten/GlutenConfig.scala |  7 +++++++
 3 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 792beda96f..cb70dc6278 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -76,6 +76,9 @@ const bool kMemoryUseHugePagesDefault = false;
 const std::string kVeloxMemInitCapacity = 
"spark.gluten.sql.columnar.backend.velox.memInitCapacity";
 const uint64_t kVeloxMemInitCapacityDefault = 8 << 20;
 
+const std::string kVeloxMemReclaimMaxWaitMs = 
"spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs";
+const uint64_t kVeloxMemReclaimMaxWaitMsDefault = 3600000; // 60min
+
 const std::string kHiveConnectorId = "test-hive";
 const std::string kVeloxCacheEnabled = 
"spark.gluten.sql.columnar.backend.velox.cacheEnabled";
 
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 1d281b22dc..32978281ef 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -41,6 +41,8 @@ static constexpr std::string_view 
kMemoryPoolInitialCapacity{"memory-pool-initia
 static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
 static constexpr std::string_view 
kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
 static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
+static constexpr std::string_view 
kMemoryReclaimMaxWaitMs{"memory-reclaim-max-wait-time"};
+static constexpr std::string_view kDefaultMemoryReclaimMaxWaitMs{"3600000ms"};
 
 template <typename T>
 T getConfig(
@@ -74,7 +76,13 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
                 config.extraConfigs,
                 kMemoryPoolTransferCapacity,
                 std::to_string(kDefaultMemoryPoolTransferCapacity)),
-            velox::config::CapacityUnit::BYTE)) {}
+            velox::config::CapacityUnit::BYTE)),
+        memoryReclaimMaxWaitMs_(
+            
std::chrono::duration_cast<std::chrono::milliseconds>(velox::config::toDuration(getConfig<std::string>(
+                                                                      
config.extraConfigs,
+                                                                      
kMemoryReclaimMaxWaitMs,
+                                                                      
std::string(kDefaultMemoryReclaimMaxWaitMs))))
+                .count()) {}
   std::string kind() const override {
     return kind_;
   }
@@ -123,7 +131,7 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
       VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only 
be used within a single root pool");
       pool = candidates_.begin()->first;
     }
-    pool->reclaim(targetBytes, 0, status); // ignore the output
+    pool->reclaim(targetBytes, memoryReclaimMaxWaitMs_, status); // ignore the 
output
     return shrinkCapacity0(pool, 0);
   }
 
@@ -172,6 +180,7 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
   gluten::AllocationListener* listener_;
   const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused.
   const uint64_t memoryPoolTransferCapacity_;
+  const uint64_t memoryReclaimMaxWaitMs_;
 
   mutable std::mutex mutex_;
   inline static std::string kind_ = "GLUTEN";
@@ -210,14 +219,16 @@ VeloxMemoryManager::VeloxMemoryManager(const std::string& 
kind, std::unique_ptr<
       kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
   auto memInitCapacity =
       
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity, 
kVeloxMemInitCapacityDefault);
+  auto memReclaimMaxWaitMs =
+      
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemReclaimMaxWaitMs, 
kVeloxMemReclaimMaxWaitMsDefault);
   blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), 
reservationBlockSize);
   listenableAlloc_ = 
std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(), 
blockListener_.get());
   arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
 
   std::unordered_map<std::string, std::string> extraArbitratorConfigs;
-  extraArbitratorConfigs["memory-pool-initial-capacity"] = 
folly::to<std::string>(memInitCapacity) + "B";
-  extraArbitratorConfigs["memory-pool-transfer-capacity"] = 
folly::to<std::string>(reservationBlockSize) + "B";
-  extraArbitratorConfigs["memory-reclaim-max-wait-time"] = 
folly::to<std::string>(0) + "ms";
+  extraArbitratorConfigs[std::string(kMemoryPoolInitialCapacity)] = 
folly::to<std::string>(memInitCapacity) + "B";
+  extraArbitratorConfigs[std::string(kMemoryPoolTransferCapacity)] = 
folly::to<std::string>(reservationBlockSize) + "B";
+  extraArbitratorConfigs[std::string(kMemoryReclaimMaxWaitMs)] = 
folly::to<std::string>(memReclaimMaxWaitMs) + "ms";
 
   ArbitratorFactoryRegister afr(listener_.get());
   velox::memory::MemoryManagerOptions mmOptions{
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index c46d874690..f756eb20a6 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1452,6 +1452,13 @@ object GlutenConfig {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("8MB")
 
+  val COLUMNAR_VELOX_MEM_RECLAIM_MAX_WAIT_MS =
+    buildConf("spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs")
+      .internal()
+      .doc("The max time in ms to wait for memory reclaim.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(TimeUnit.MINUTES.toMillis(60))
+
   val COLUMNAR_VELOX_SSD_CACHE_PATH =
     buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to