This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 3e40e3f7 Support dynamic expansion of RDMA block pool (#3155)
3e40e3f7 is described below
commit 3e40e3f72a1e722cabe286bb245e68cdaa99eaf7
Author: Bright Chen <[email protected]>
AuthorDate: Sat Nov 29 23:01:14 2025 +0800
Support dynamic expansion of RDMA block pool (#3155)
---
src/brpc/rdma/block_pool.cpp | 151 +++++++++++++++++++++--------------------
src/brpc/rdma/block_pool.h | 3 +-
src/butil/memory/scope_guard.h | 2 +
3 files changed, 81 insertions(+), 75 deletions(-)
diff --git a/src/brpc/rdma/block_pool.cpp b/src/brpc/rdma/block_pool.cpp
index 826fc5d1..24907a19 100644
--- a/src/brpc/rdma/block_pool.cpp
+++ b/src/brpc/rdma/block_pool.cpp
@@ -25,10 +25,9 @@
#include "butil/iobuf.h"
#include "butil/object_pool.h"
#include "butil/thread_local.h"
-#include "bthread/bthread.h"
+#include "butil/memory/scope_guard.h"
#include "brpc/rdma/block_pool.h"
-
namespace brpc {
namespace rdma {
@@ -98,6 +97,8 @@ struct GlobalInfo {
std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
int region_num[BLOCK_SIZE_COUNT];
butil::Mutex extend_lock;
+ std::vector<IdleNode*> expansion_list[BLOCK_SIZE_COUNT];
+ std::vector<size_t> expansion_size[BLOCK_SIZE_COUNT];
};
static GlobalInfo* g_info = NULL;
@@ -129,36 +130,20 @@ uint32_t GetRegionId(const void* buf) {
return r->id;
}
-// When both rdma_memory_pool_max_regions and rdma_memory_pool_buckets are
-// greater than 1, dynamic memory expansion may cause concurrent modification
-// issues in the memory linked list due to lock contention problems. To address
-// this, we increase the region_num count for each block_type. Dynamic memory
-// expansion is only permitted when both of the following conditions are met:
-// rdma_memory_pool_buckets equals 1
-// g_info->region_num[block_type] is less than 1
-static bool CanExtendBlockRuntime(int block_type) {
- return FLAGS_rdma_memory_pool_buckets == 1 ||
- g_info->region_num[block_type] < 1;
-}
-
-static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
- int block_type) {
- if (CanExtendBlockRuntime(block_type) == false) {
- LOG(INFO) << "Runtime extend memory only support one bucket or region "
- "num is zero for per block_type";
+static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, int
block_type) {
+ auto region_base_guard = butil::MakeScopeGuard([region_base]() {
free(region_base);
- errno = ENOMEM;
- return NULL;
- }
+ });
+
if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
- LOG(INFO) << "Memory pool reaches max regions";
- free(region_base);
+ LOG_EVERY_SECOND(ERROR) << "Memory pool reaches max regions";
errno = ENOMEM;
return NULL;
}
+
uint32_t id = g_cb(region_base, region_size);
if (id == 0) {
- free(region_base);
+ errno = EINVAL;
return NULL;
}
@@ -170,7 +155,7 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t
region_size,
for (size_t j = 0; j < i; ++j) {
butil::return_object<IdleNode>(node[j]);
}
- free(region_base);
+ errno = ENOMEM;
return NULL;
}
}
@@ -184,12 +169,15 @@ static void* ExtendBlockPoolImpl(void* region_base,
size_t region_size,
for (size_t i = 0; i < g_buckets; ++i) {
node[i]->start = (void*)(region->start + i * (region_size /
g_buckets));
node[i]->len = region_size / g_buckets;
- node[i]->next = g_info->idle_list[block_type][i];
- g_info->idle_list[block_type][i] = node[i];
- g_info->idle_size[block_type][i] += node[i]->len;
+ node[i]->next = g_info->expansion_list[block_type][i];
+ g_info->expansion_list[block_type][i] = node[i];
+ g_info->expansion_size[block_type][i] += node[i]->len;
}
g_info->region_num[block_type]++;
+ // `region_base' is inuse, cannot be freed.
+ region_base_guard.dismiss();
+
return region_base;
}
@@ -203,7 +191,7 @@ static void* ExtendBlockPool(size_t region_size, int
block_type) {
if (FLAGS_rdma_memory_pool_user_specified_memory) {
LOG_EVERY_SECOND(ERROR) << "Fail to extend new region, "
"rdma_memory_pool_user_specified_memory is "
- "true, ExtendBlockPool is disabled";
+ "true, ExtendBlockPool is disabled";
return NULL;
}
@@ -222,24 +210,27 @@ static void* ExtendBlockPool(size_t region_size, int
block_type) {
return ExtendBlockPoolImpl(region_base, region_size, block_type);
}
-void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
- int block_type) {
- if (FLAGS_rdma_memory_pool_user_specified_memory == false) {
+void* ExtendBlockPoolByUser(void* region_base, size_t region_size, int
block_type) {
+ auto region_base_guard = butil::MakeScopeGuard([region_base]() {
+ free(region_base);
+ });
+
+ if (!FLAGS_rdma_memory_pool_user_specified_memory) {
LOG_EVERY_SECOND(ERROR) << "User extend memory is disabled";
return NULL;
}
if (reinterpret_cast<uintptr_t>(region_base) % 4096 != 0) {
LOG_EVERY_SECOND(ERROR) << "region_base must be 4096 aligned";
+ errno = EINVAL;
return NULL;
}
- uint64_t index = butil::fast_rand() % g_buckets;
- BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
- BAIDU_SCOPED_LOCK(g_info->extend_lock);
region_size =
region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
region_size *= g_block_size[block_type] * g_buckets;
+ region_base_guard.dismiss();
+ BAIDU_SCOPED_LOCK(g_info->extend_lock);
return ExtendBlockPoolImpl(region_base, region_size, block_type);
}
@@ -316,6 +307,14 @@ bool InitBlockPool(RegisterCallback cb) {
return false;
}
}
+ g_info->expansion_list[i].resize(g_buckets, NULL);
+ if (g_info->expansion_list[i].size() != g_buckets) {
+ return false;
+ }
+ g_info->expansion_size[i].resize(g_buckets, 0);
+ if (g_info->expansion_size[i].size() != g_buckets) {
+ return false;
+ }
}
g_dump_mutex = new butil::Mutex;
@@ -332,66 +331,74 @@ bool InitBlockPool(RegisterCallback cb) {
return false;
}
+static void MoveExpansionList2EmptyIdleList(int block_type, size_t index) {
+ CHECK(NULL == g_info->idle_list[block_type][index]);
+
+ g_info->idle_list[block_type][index] =
g_info->expansion_list[block_type][index];
+ g_info->idle_size[block_type][index] +=
g_info->expansion_size[block_type][index];
+ g_info->expansion_list[block_type][index] = NULL;
+ g_info->expansion_size[block_type][index] = 0;
+}
+
static void* AllocBlockFrom(int block_type) {
bool locked = false;
if (BAIDU_UNLIKELY(g_dump_enable)) {
g_dump_mutex->lock();
locked = true;
}
+ BUTIL_SCOPE_EXIT {
+ if (locked) {
+ g_dump_mutex->unlock();
+ }
+ };
+
void* ptr = NULL;
- if (block_type == 0 && tls_idle_list != NULL){
+ if (0 == block_type && NULL != tls_idle_list) {
CHECK(tls_idle_num > 0);
IdleNode* n = tls_idle_list;
tls_idle_list = n->next;
ptr = n->start;
butil::return_object<IdleNode>(n);
tls_idle_num--;
- if (locked) {
- g_dump_mutex->unlock();
- }
return ptr;
}
- uint64_t index = butil::fast_rand() % g_buckets;
+ size_t index = butil::fast_rand() % g_buckets;
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
IdleNode* node = g_info->idle_list[block_type][index];
- if (!node) {
+ if (NULL == node) {
BAIDU_SCOPED_LOCK(g_info->extend_lock);
node = g_info->idle_list[block_type][index];
- if (!node) {
- // There is no block left, extend a new region
- if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,
- block_type)) {
+ if (NULL == node && NULL != g_info->expansion_list[block_type][index])
{
+ MoveExpansionList2EmptyIdleList(block_type, index);
+ node = g_info->idle_list[block_type][index];
+ }
+ if (NULL == node) {
+ // There is no block left, extend a new region.
+ if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,
block_type)) {
LOG_EVERY_SECOND(ERROR) << "Fail to extend new region. "
<< "You can set the size of memory
pool larger. "
<< "Refer to the help message of these
flags: "
<< "rdma_memory_pool_initial_size_mb, "
<< "rdma_memory_pool_increase_size_mb,
"
<< "rdma_memory_pool_max_regions.";
- if (locked) {
- g_dump_mutex->unlock();
- }
return NULL;
}
+ MoveExpansionList2EmptyIdleList(block_type, index);
node = g_info->idle_list[block_type][index];
}
}
- if (node) {
- ptr = node->start;
- if (node->len > g_block_size[block_type]) {
- node->start = (char*)node->start + g_block_size[block_type];
- node->len -= g_block_size[block_type];
- } else {
- g_info->idle_list[block_type][index] = node->next;
- butil::return_object<IdleNode>(node);
- }
- g_info->idle_size[block_type][index] -= g_block_size[block_type];
+ CHECK(NULL != node);
+
+ ptr = node->start;
+ if (node->len > g_block_size[block_type]) {
+ node->start = (char*)node->start + g_block_size[block_type];
+ node->len -= g_block_size[block_type];
} else {
- if (locked) {
- g_dump_mutex->unlock();
- }
- return NULL;
+ g_info->idle_list[block_type][index] = node->next;
+ butil::return_object<IdleNode>(node);
}
+ g_info->idle_size[block_type][index] -= g_block_size[block_type];
// Move more blocks from global list to tls list
if (block_type == 0) {
@@ -417,9 +424,6 @@ static void* AllocBlockFrom(int block_type) {
}
}
- if (locked) {
- g_dump_mutex->unlock();
- }
return ptr;
}
@@ -482,6 +486,12 @@ int DeallocBlock(void* buf) {
g_dump_mutex->lock();
locked = true;
}
+ BUTIL_SCOPE_EXIT {
+ if (locked) {
+ g_dump_mutex->unlock();
+ }
+ };
+
if (block_type == 0 && tls_idle_num <
(uint32_t)FLAGS_rdma_memory_pool_tls_cache_num) {
if (!tls_inited) {
tls_inited = true;
@@ -494,9 +504,6 @@ int DeallocBlock(void* buf) {
tls_idle_num++;
node->next = tls_idle_list;
tls_idle_list = node;
- if (locked) {
- g_dump_mutex->unlock();
- }
return 0;
}
@@ -527,9 +534,6 @@ int DeallocBlock(void* buf) {
g_info->idle_list[block_type][index] = node;
g_info->idle_size[block_type][index] += node->len;
}
- if (locked) {
- g_dump_mutex->unlock();
- }
return 0;
}
@@ -557,7 +561,8 @@ void DumpMemoryPoolInfo(std::ostream& os) {
for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
os << "\tFor block size " << GetBlockSize(i) << ":\n";
for (size_t j = 0; j < g_buckets; ++j) {
- os << "\t\tBucket " << j << ": " << g_info->idle_size[i][j] <<
"\n";
+ os << "\t\tBucket " << j << ": {" << g_info->idle_size[i][j]
+ << ", " << g_info->expansion_size[i][j] << "}\n";
}
}
os << "Thread Local Cache Info:\n";
diff --git a/src/brpc/rdma/block_pool.h b/src/brpc/rdma/block_pool.h
index 00a31082..f9018e5e 100644
--- a/src/brpc/rdma/block_pool.h
+++ b/src/brpc/rdma/block_pool.h
@@ -80,8 +80,7 @@ bool InitBlockPool(RegisterCallback cb);
// FLAGS_rdma_memory_pool_user_specified_memory is true, user is
responsibility
// of extending memory blocks , this ensuring flexibility for advanced use
// cases.
-void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
- int block_type);
+void* ExtendBlockPoolByUser(void* region_base, size_t region_size, int
block_type);
// Allocate a buf with length at least @a size (require: size>0)
// Return the address allocated, NULL if failed and errno is set.
diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h
index 7d72a560..377819b5 100644
--- a/src/butil/memory/scope_guard.h
+++ b/src/butil/memory/scope_guard.h
@@ -104,4 +104,6 @@ operator+(ScopeExitHelper, Callback&& callback) {
auto BRPC_ANONYMOUS_VARIABLE(SCOPE_EXIT) = \
::butil::internal::ScopeExitHelper() + [&]() noexcept
+#define BUTIL_SCOPE_EXIT BRPC_SCOPE_EXIT
+
#endif // BUTIL_SCOPED_GUARD_H
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]