This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1c6c7fcc2 [VL] Add config for memory pool init capacity to reduce
arbitration times (#5815)
1c6c7fcc2 is described below
commit 1c6c7fcc2eb59831e21ae7b24500dae883799bd6
Author: Yang Zhang <[email protected]>
AuthorDate: Wed May 22 09:42:17 2024 +0800
[VL] Add config for memory pool init capacity to reduce arbitration times
(#5815)
---
.../scala/org/apache/gluten/execution/VeloxTPCHSuite.scala | 1 +
cpp/velox/config/VeloxConfig.h | 3 +++
cpp/velox/memory/VeloxMemoryManager.cc | 12 +++++++++---
cpp/velox/tests/MemoryManagerTest.cc | 13 ++++++++-----
.../src/main/scala/org/apache/gluten/GlutenConfig.scala | 7 +++++++
5 files changed, 28 insertions(+), 8 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
index 47f0b2c69..17f27a407 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
@@ -42,6 +42,7 @@ abstract class VeloxTPCHTableSupport extends
VeloxWholeStageTransformerSuite {
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.sql.shuffle.partitions", "1")
+ .set("spark.gluten.sql.columnar.backend.velox.memInitCapacity", "1m")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index a3112f83e..f57f1293e 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -70,6 +70,9 @@ const bool kEnableSystemExceptionStacktraceDefault = true;
const std::string kMemoryUseHugePages =
"spark.gluten.sql.columnar.backend.velox.memoryUseHugePages";
const bool kMemoryUseHugePagesDefault = false;
+const std::string kVeloxMemInitCapacity =
"spark.gluten.sql.columnar.backend.velox.memInitCapacity";
+const uint64_t kVeloxMemInitCapacityDefault = 8 << 20;
+
const std::string kHiveConnectorId = "test-hive";
const std::string kVeloxCacheEnabled =
"spark.gluten.sql.columnar.backend.velox.cacheEnabled";
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index 0584780ad..b7bd3a9f9 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -42,8 +42,12 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
}
uint64_t growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes)
override {
- VELOX_CHECK_EQ(targetBytes, 0, "Gluten has set
MemoryManagerOptions.memoryPoolInitCapacity to 0")
- return 0;
+ std::lock_guard<std::recursive_mutex> l(mutex_);
+ listener_->allocationChanged(targetBytes);
+ if (!pool->grow(targetBytes, 0)) {
+ VELOX_FAIL("Failed to grow root pool's capacity for {}",
velox::succinctBytes(targetBytes));
+ }
+ return targetBytes;
}
uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t
targetBytes) override {
@@ -160,6 +164,8 @@ VeloxMemoryManager::VeloxMemoryManager(
: MemoryManager(), name_(name), listener_(std::move(listener)) {
auto reservationBlockSize =
VeloxBackend::get()->getBackendConf()->get<uint64_t>(
kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
+ auto memInitCapacity =
+
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity,
kVeloxMemInitCapacityDefault);
blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(),
reservationBlockSize);
listenableAlloc_ =
std::make_unique<ListenableMemoryAllocator>(allocator.get(),
blockListener_.get());
arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
@@ -173,7 +179,7 @@ VeloxMemoryManager::VeloxMemoryManager(
.coreOnAllocationFailureEnabled = false,
.allocatorCapacity = velox::memory::kMaxMemory,
.arbitratorKind = afr.getKind(),
- .memoryPoolInitCapacity = 0,
+ .memoryPoolInitCapacity = memInitCapacity,
.memoryPoolTransferCapacity = reservationBlockSize,
.memoryReclaimWaitMs = 0};
veloxMemoryManager_ =
std::make_unique<velox::memory::MemoryManager>(mmOptions);
diff --git a/cpp/velox/tests/MemoryManagerTest.cc
b/cpp/velox/tests/MemoryManagerTest.cc
index f256db1b2..400beafcc 100644
--- a/cpp/velox/tests/MemoryManagerTest.cc
+++ b/cpp/velox/tests/MemoryManagerTest.cc
@@ -17,7 +17,7 @@
#include "benchmarks/common/BenchmarkUtils.h"
#include "compute/VeloxBackend.h"
-#include "config/GlutenConfig.h"
+#include "config/VeloxConfig.h"
#include "memory/VeloxMemoryManager.h"
#include "velox/common/base/tests/GTestUtils.h"
@@ -48,7 +48,8 @@ class MemoryManagerTest : public ::testing::Test {
protected:
static void SetUpTestCase() {
std::unordered_map<std::string, std::string> conf = {
- {kMemoryReservationBlockSize,
std::to_string(kMemoryReservationBlockSizeDefault)}};
+ {kMemoryReservationBlockSize,
std::to_string(kMemoryReservationBlockSizeDefault)},
+ {kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}};
initVeloxBackend(conf);
}
@@ -93,6 +94,8 @@ TEST_F(MemoryManagerTest, memoryPoolWithBlockReseravtion) {
}
TEST_F(MemoryManagerTest, memoryAllocatorWithBlockReservation) {
+ auto initBytes = listener_->currentBytes();
+
std::vector<Allocation> allocations;
std::vector<uint64_t> sizes{
kMemoryReservationBlockSizeDefault - 1 * kMB,
kMemoryReservationBlockSizeDefault - 2 * kMB};
@@ -105,7 +108,7 @@ TEST_F(MemoryManagerTest,
memoryAllocatorWithBlockReservation) {
EXPECT_EQ(allocator_->getBytes(), currentBytes + size);
EXPECT_EQ(allocator_->peakBytes(), allocator_->getBytes());
- EXPECT_EQ(listener_->currentBytes(), (i + 1) *
kMemoryReservationBlockSizeDefault);
+ EXPECT_EQ(listener_->currentBytes(), (i + 1) *
kMemoryReservationBlockSizeDefault + initBytes);
EXPECT_EQ(listener_->peakBytes(), listener_->currentBytes());
}
@@ -114,14 +117,14 @@ TEST_F(MemoryManagerTest,
memoryAllocatorWithBlockReservation) {
allocations.pop_back();
allocator_->free(allocation.buffer, allocation.size);
EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
- EXPECT_EQ(listener_->currentBytes(), kMemoryReservationBlockSizeDefault);
+ EXPECT_EQ(listener_->currentBytes(), kMemoryReservationBlockSizeDefault +
initBytes);
currentBytes = allocator_->getBytes();
allocation = allocations.back();
allocations.pop_back();
allocator_->free(allocation.buffer, allocation.size);
EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
- EXPECT_EQ(listener_->currentBytes(), 0);
+ EXPECT_EQ(listener_->currentBytes(), initBytes);
ASSERT_EQ(allocator_->getBytes(), 0);
}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 02c6bf7fe..1f682557b 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1207,6 +1207,13 @@ object GlutenConfig {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1GB")
+ val COLUMNAR_VELOX_MEM_INIT_CAPACITY =
+ buildConf("spark.gluten.sql.columnar.backend.velox.memInitCapacity")
+ .internal()
+ .doc("The initial memory capacity to reserve for a newly created Velox
query memory pool.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("8MB")
+
val COLUMNAR_VELOX_SSD_CACHE_PATH =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]