This is an automated email from the ASF dual-hosted git repository.

yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c6c7fcc2 [VL] Add config for memory pool init capacity to reduce 
arbitration times (#5815)
1c6c7fcc2 is described below

commit 1c6c7fcc2eb59831e21ae7b24500dae883799bd6
Author: Yang Zhang <[email protected]>
AuthorDate: Wed May 22 09:42:17 2024 +0800

    [VL] Add config for memory pool init capacity to reduce arbitration times 
(#5815)
---
 .../scala/org/apache/gluten/execution/VeloxTPCHSuite.scala  |  1 +
 cpp/velox/config/VeloxConfig.h                              |  3 +++
 cpp/velox/memory/VeloxMemoryManager.cc                      | 12 +++++++++---
 cpp/velox/tests/MemoryManagerTest.cc                        | 13 ++++++++-----
 .../src/main/scala/org/apache/gluten/GlutenConfig.scala     |  7 +++++++
 5 files changed, 28 insertions(+), 8 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
index 47f0b2c69..17f27a407 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
@@ -42,6 +42,7 @@ abstract class VeloxTPCHTableSupport extends 
VeloxWholeStageTransformerSuite {
       .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
       .set("spark.sql.files.maxPartitionBytes", "1g")
       .set("spark.sql.shuffle.partitions", "1")
+      .set("spark.gluten.sql.columnar.backend.velox.memInitCapacity", "1m")
       .set("spark.memory.offHeap.size", "2g")
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
       .set("spark.sql.autoBroadcastJoinThreshold", "-1")
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index a3112f83e..f57f1293e 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -70,6 +70,9 @@ const bool kEnableSystemExceptionStacktraceDefault = true;
 const std::string kMemoryUseHugePages = 
"spark.gluten.sql.columnar.backend.velox.memoryUseHugePages";
 const bool kMemoryUseHugePagesDefault = false;
 
+const std::string kVeloxMemInitCapacity = 
"spark.gluten.sql.columnar.backend.velox.memInitCapacity";
+const uint64_t kVeloxMemInitCapacityDefault = 8 << 20;
+
 const std::string kHiveConnectorId = "test-hive";
 const std::string kVeloxCacheEnabled = 
"spark.gluten.sql.columnar.backend.velox.cacheEnabled";
 
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 0584780ad..b7bd3a9f9 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -42,8 +42,12 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
   }
 
   uint64_t growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) 
override {
-    VELOX_CHECK_EQ(targetBytes, 0, "Gluten has set 
MemoryManagerOptions.memoryPoolInitCapacity to 0")
-    return 0;
+    std::lock_guard<std::recursive_mutex> l(mutex_);
+    listener_->allocationChanged(targetBytes);
+    if (!pool->grow(targetBytes, 0)) {
+      VELOX_FAIL("Failed to grow root pool's capacity for {}", 
velox::succinctBytes(targetBytes));
+    }
+    return targetBytes;
   }
 
   uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t 
targetBytes) override {
@@ -160,6 +164,8 @@ VeloxMemoryManager::VeloxMemoryManager(
     : MemoryManager(), name_(name), listener_(std::move(listener)) {
   auto reservationBlockSize = 
VeloxBackend::get()->getBackendConf()->get<uint64_t>(
       kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
+  auto memInitCapacity =
+      
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity, 
kVeloxMemInitCapacityDefault);
   blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), 
reservationBlockSize);
   listenableAlloc_ = 
std::make_unique<ListenableMemoryAllocator>(allocator.get(), 
blockListener_.get());
   arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
@@ -173,7 +179,7 @@ VeloxMemoryManager::VeloxMemoryManager(
       .coreOnAllocationFailureEnabled = false,
       .allocatorCapacity = velox::memory::kMaxMemory,
       .arbitratorKind = afr.getKind(),
-      .memoryPoolInitCapacity = 0,
+      .memoryPoolInitCapacity = memInitCapacity,
       .memoryPoolTransferCapacity = reservationBlockSize,
       .memoryReclaimWaitMs = 0};
   veloxMemoryManager_ = 
std::make_unique<velox::memory::MemoryManager>(mmOptions);
diff --git a/cpp/velox/tests/MemoryManagerTest.cc 
b/cpp/velox/tests/MemoryManagerTest.cc
index f256db1b2..400beafcc 100644
--- a/cpp/velox/tests/MemoryManagerTest.cc
+++ b/cpp/velox/tests/MemoryManagerTest.cc
@@ -17,7 +17,7 @@
 
 #include "benchmarks/common/BenchmarkUtils.h"
 #include "compute/VeloxBackend.h"
-#include "config/GlutenConfig.h"
+#include "config/VeloxConfig.h"
 #include "memory/VeloxMemoryManager.h"
 #include "velox/common/base/tests/GTestUtils.h"
 
@@ -48,7 +48,8 @@ class MemoryManagerTest : public ::testing::Test {
  protected:
   static void SetUpTestCase() {
     std::unordered_map<std::string, std::string> conf = {
-        {kMemoryReservationBlockSize, 
std::to_string(kMemoryReservationBlockSizeDefault)}};
+        {kMemoryReservationBlockSize, 
std::to_string(kMemoryReservationBlockSizeDefault)},
+        {kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}};
     initVeloxBackend(conf);
   }
 
@@ -93,6 +94,8 @@ TEST_F(MemoryManagerTest, memoryPoolWithBlockReseravtion) {
 }
 
 TEST_F(MemoryManagerTest, memoryAllocatorWithBlockReservation) {
+  auto initBytes = listener_->currentBytes();
+
   std::vector<Allocation> allocations;
   std::vector<uint64_t> sizes{
       kMemoryReservationBlockSizeDefault - 1 * kMB, 
kMemoryReservationBlockSizeDefault - 2 * kMB};
@@ -105,7 +108,7 @@ TEST_F(MemoryManagerTest, 
memoryAllocatorWithBlockReservation) {
 
     EXPECT_EQ(allocator_->getBytes(), currentBytes + size);
     EXPECT_EQ(allocator_->peakBytes(), allocator_->getBytes());
-    EXPECT_EQ(listener_->currentBytes(), (i + 1) * 
kMemoryReservationBlockSizeDefault);
+    EXPECT_EQ(listener_->currentBytes(), (i + 1) * 
kMemoryReservationBlockSizeDefault + initBytes);
     EXPECT_EQ(listener_->peakBytes(), listener_->currentBytes());
   }
 
@@ -114,14 +117,14 @@ TEST_F(MemoryManagerTest, 
memoryAllocatorWithBlockReservation) {
   allocations.pop_back();
   allocator_->free(allocation.buffer, allocation.size);
   EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
-  EXPECT_EQ(listener_->currentBytes(), kMemoryReservationBlockSizeDefault);
+  EXPECT_EQ(listener_->currentBytes(), kMemoryReservationBlockSizeDefault + 
initBytes);
 
   currentBytes = allocator_->getBytes();
   allocation = allocations.back();
   allocations.pop_back();
   allocator_->free(allocation.buffer, allocation.size);
   EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
-  EXPECT_EQ(listener_->currentBytes(), 0);
+  EXPECT_EQ(listener_->currentBytes(), initBytes);
 
   ASSERT_EQ(allocator_->getBytes(), 0);
 }
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 02c6bf7fe..1f682557b 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1207,6 +1207,13 @@ object GlutenConfig {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("1GB")
 
+  val COLUMNAR_VELOX_MEM_INIT_CAPACITY =
+    buildConf("spark.gluten.sql.columnar.backend.velox.memInitCapacity")
+      .internal()
+      .doc("The initial memory capacity to reserve for a newly created Velox 
query memory pool.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("8MB")
+
   val COLUMNAR_VELOX_SSD_CACHE_PATH =
     buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to