This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 761f1c58a3 [GLUTEN-7800][VL] Add config for max reclaim wait time to 
avoid dead lock when memory arbitration
761f1c58a3 is described below

commit 761f1c58a3adb6d768680523431ccb67b459f6c5
Author: Yang Zhang <[email protected]>
AuthorDate: Thu Nov 7 08:32:05 2024 +0800

    [GLUTEN-7800][VL] Add config for max reclaim wait time to avoid dead lock 
when memory arbitration
---
 cpp/velox/config/VeloxConfig.h                     |  3 ++
 cpp/velox/jni/VeloxJniWrapper.cc                   |  1 -
 cpp/velox/memory/VeloxMemoryManager.cc             | 42 ++++++++++++----------
 cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc     |  1 -
 .../scala/org/apache/gluten/GlutenConfig.scala     |  7 ++++
 5 files changed, 33 insertions(+), 21 deletions(-)

diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 792beda96f..cb70dc6278 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -76,6 +76,9 @@ const bool kMemoryUseHugePagesDefault = false;
 const std::string kVeloxMemInitCapacity = 
"spark.gluten.sql.columnar.backend.velox.memInitCapacity";
 const uint64_t kVeloxMemInitCapacityDefault = 8 << 20;
 
+const std::string kVeloxMemReclaimMaxWaitMs = 
"spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs";
+const uint64_t kVeloxMemReclaimMaxWaitMsDefault = 3600000; // 60min
+
 const std::string kHiveConnectorId = "test-hive";
 const std::string kVeloxCacheEnabled = 
"spark.gluten.sql.columnar.backend.velox.cacheEnabled";
 
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index b8d2b0c3c2..6ea60d651a 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -385,7 +385,6 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapp
     jlong dsHandle,
     jlong batchHandle) {
   JNI_METHOD_START
-  auto ctx = gluten::getRuntime(env, wrapper);
   auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle);
   auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
   datasource->write(batch);
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 19a5d45804..55420d9380 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -22,6 +22,7 @@
 
 #include "velox/common/memory/MallocAllocator.h"
 #include "velox/common/memory/MemoryPool.h"
+#include "velox/common/memory/SharedArbitrator.h"
 #include "velox/exec/MemoryReclaimer.h"
 
 #include "compute/VeloxBackend.h"
@@ -36,12 +37,6 @@ namespace gluten {
 using namespace facebook;
 
 namespace {
-
-static constexpr std::string_view 
kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
-static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
-static constexpr std::string_view 
kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
-static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
-
 template <typename T>
 T getConfig(
     const std::unordered_map<std::string, std::string>& configs,
@@ -57,24 +52,28 @@ T getConfig(
   return defaultValue;
 }
 } // namespace
+
 /// We assume in a single Spark task. No thread-safety should be guaranteed.
 class ListenableArbitrator : public velox::memory::MemoryArbitrator {
  public:
   ListenableArbitrator(const Config& config, AllocationListener* listener)
       : MemoryArbitrator(config),
         listener_(listener),
-        memoryPoolInitialCapacity_(velox::config::toCapacity(
-            getConfig<std::string>(
-                config.extraConfigs,
-                kMemoryPoolInitialCapacity,
-                std::to_string(kDefaultMemoryPoolInitialCapacity)),
-            velox::config::CapacityUnit::BYTE)),
+        reclaimMaxWaitMs_(
+            
velox::memory::SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeMs(config.extraConfigs)),
+        memoryPoolInitialCapacity_(
+            
velox::memory::SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(config.extraConfigs)),
         memoryPoolTransferCapacity_(velox::config::toCapacity(
             getConfig<std::string>(
                 config.extraConfigs,
-                kMemoryPoolTransferCapacity,
-                std::to_string(kDefaultMemoryPoolTransferCapacity)),
-            velox::config::CapacityUnit::BYTE)) {}
+                kMemoryReservationBlockSize,
+                std::to_string(kMemoryReservationBlockSizeDefault)),
+            velox::config::CapacityUnit::BYTE)) {
+    if (reclaimMaxWaitMs_ == 0) {
+      LOG(WARNING) << kVeloxMemReclaimMaxWaitMs
+                   << " was set to 0, it may cause dead lock when memory 
arbitration has bug.";
+    }
+  }
   std::string kind() const override {
     return kind_;
   }
@@ -121,7 +120,7 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
       VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only 
be used within a single root pool");
       pool = candidates_.begin()->first;
     }
-    pool->reclaim(targetBytes, 0, status); // ignore the output
+    pool->reclaim(targetBytes, reclaimMaxWaitMs_, status); // ignore the output
     return shrinkCapacity0(pool, 0);
   }
 
@@ -168,6 +167,7 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
   }
 
   gluten::AllocationListener* listener_;
+  const uint64_t reclaimMaxWaitMs_;
   const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused.
   const uint64_t memoryPoolTransferCapacity_;
 
@@ -208,14 +208,18 @@ VeloxMemoryManager::VeloxMemoryManager(const std::string& 
kind, std::unique_ptr<
       kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
   auto memInitCapacity =
       
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity, 
kVeloxMemInitCapacityDefault);
+  auto memReclaimMaxWaitMs =
+      
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemReclaimMaxWaitMs, 
kVeloxMemReclaimMaxWaitMsDefault);
   blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), 
reservationBlockSize);
   listenableAlloc_ = 
std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(), 
blockListener_.get());
   arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
 
   std::unordered_map<std::string, std::string> extraArbitratorConfigs;
-  extraArbitratorConfigs["memory-pool-initial-capacity"] = 
folly::to<std::string>(memInitCapacity) + "B";
-  extraArbitratorConfigs["memory-pool-transfer-capacity"] = 
folly::to<std::string>(reservationBlockSize) + "B";
-  extraArbitratorConfigs["memory-reclaim-max-wait-time"] = 
folly::to<std::string>(0) + "ms";
+  
extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)]
 =
+      folly::to<std::string>(memInitCapacity) + "B";
+  extraArbitratorConfigs[kMemoryReservationBlockSize] = 
folly::to<std::string>(reservationBlockSize) + "B";
+  
extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)]
 =
+      folly::to<std::string>(memReclaimMaxWaitMs) + "ms";
 
   ArbitratorFactoryRegister afr(listener_.get());
   velox::memory::MemoryManagerOptions mmOptions{
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index 34796e378e..e17ad5e2f7 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -117,7 +117,6 @@ arrow::Status 
VeloxRssSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb
 }
 
 arrow::Status VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) {
-  int64_t rawSize = batch_->size();
   bufferOutputStream_->seekp(0);
   batch_->flush(bufferOutputStream_.get());
   auto buffer = bufferOutputStream_->getBuffer();
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index a28a7d26b3..13bf72d47e 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1414,6 +1414,13 @@ object GlutenConfig {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("8MB")
 
+  val COLUMNAR_VELOX_MEM_RECLAIM_MAX_WAIT_MS =
+    buildConf("spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs")
+      .internal()
+      .doc("The max time in ms to wait for memory reclaim.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(TimeUnit.MINUTES.toMillis(60))
+
   val COLUMNAR_VELOX_SSD_CACHE_PATH =
     buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to