This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e40e3f7 Support dynamic expansion of RDMA block pool (#3155)
3e40e3f7 is described below

commit 3e40e3f72a1e722cabe286bb245e68cdaa99eaf7
Author: Bright Chen <[email protected]>
AuthorDate: Sat Nov 29 23:01:14 2025 +0800

    Support dynamic expansion of RDMA block pool (#3155)
---
 src/brpc/rdma/block_pool.cpp   | 151 +++++++++++++++++++++--------------------
 src/brpc/rdma/block_pool.h     |   3 +-
 src/butil/memory/scope_guard.h |   2 +
 3 files changed, 81 insertions(+), 75 deletions(-)

diff --git a/src/brpc/rdma/block_pool.cpp b/src/brpc/rdma/block_pool.cpp
index 826fc5d1..24907a19 100644
--- a/src/brpc/rdma/block_pool.cpp
+++ b/src/brpc/rdma/block_pool.cpp
@@ -25,10 +25,9 @@
 #include "butil/iobuf.h"
 #include "butil/object_pool.h"
 #include "butil/thread_local.h"
-#include "bthread/bthread.h"
+#include "butil/memory/scope_guard.h"
 #include "brpc/rdma/block_pool.h"
 
-
 namespace brpc {
 namespace rdma {
 
@@ -98,6 +97,8 @@ struct GlobalInfo {
     std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
     int region_num[BLOCK_SIZE_COUNT];
     butil::Mutex extend_lock;
+    std::vector<IdleNode*> expansion_list[BLOCK_SIZE_COUNT];
+    std::vector<size_t> expansion_size[BLOCK_SIZE_COUNT];
 };
 static GlobalInfo* g_info = NULL;
 
@@ -129,36 +130,20 @@ uint32_t GetRegionId(const void* buf) {
     return r->id;
 }
 
-// When both rdma_memory_pool_max_regions and rdma_memory_pool_buckets are
-// greater than 1, dynamic memory expansion may cause concurrent modification
-// issues in the memory linked list due to lock contention problems. To address
-// this, we increase the region_num count for each block_type. Dynamic memory
-// expansion is only permitted when both of the following conditions are met:
-// rdma_memory_pool_buckets equals 1
-// g_info->region_num[block_type] is less than 1
-static bool CanExtendBlockRuntime(int block_type) {
-    return FLAGS_rdma_memory_pool_buckets == 1 ||
-           g_info->region_num[block_type] < 1;
-}
-
-static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
-                                 int block_type) {
-    if (CanExtendBlockRuntime(block_type) == false) {
-        LOG(INFO) << "Runtime extend memory only support one bucket or region "
-                     "num is zero for per block_type";
+static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, int 
block_type) {
+    auto region_base_guard = butil::MakeScopeGuard([region_base]() {
         free(region_base);
-        errno = ENOMEM;
-        return NULL;
-    }
+    });
+
     if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
-        LOG(INFO) << "Memory pool reaches max regions";
-        free(region_base);
+        LOG_EVERY_SECOND(ERROR) << "Memory pool reaches max regions";
         errno = ENOMEM;
         return NULL;
     }
+
     uint32_t id = g_cb(region_base, region_size);
     if (id == 0) {
-        free(region_base);
+        errno = EINVAL;
         return NULL;
     }
 
@@ -170,7 +155,7 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t 
region_size,
             for (size_t j = 0; j < i; ++j) {
                 butil::return_object<IdleNode>(node[j]);
             }
-            free(region_base);
+            errno = ENOMEM;
             return NULL;
         }
     }
@@ -184,12 +169,15 @@ static void* ExtendBlockPoolImpl(void* region_base, 
size_t region_size,
     for (size_t i = 0; i < g_buckets; ++i) {
         node[i]->start = (void*)(region->start + i * (region_size / 
g_buckets));
         node[i]->len = region_size / g_buckets;
-        node[i]->next = g_info->idle_list[block_type][i];
-        g_info->idle_list[block_type][i] = node[i];
-        g_info->idle_size[block_type][i] += node[i]->len;
+        node[i]->next = g_info->expansion_list[block_type][i];
+        g_info->expansion_list[block_type][i] = node[i];
+        g_info->expansion_size[block_type][i] += node[i]->len;
     }
     g_info->region_num[block_type]++;
 
+    // `region_base' is inuse, cannot be freed.
+    region_base_guard.dismiss();
+
     return region_base;
 }
 
@@ -203,7 +191,7 @@ static void* ExtendBlockPool(size_t region_size, int 
block_type) {
     if (FLAGS_rdma_memory_pool_user_specified_memory) {
         LOG_EVERY_SECOND(ERROR) << "Fail to extend new region, "
                                    "rdma_memory_pool_user_specified_memory is "
-                                   "true,  ExtendBlockPool is disabled";
+                                   "true, ExtendBlockPool is disabled";
         return NULL;
     }
 
@@ -222,24 +210,27 @@ static void* ExtendBlockPool(size_t region_size, int 
block_type) {
     return ExtendBlockPoolImpl(region_base, region_size, block_type);
 }
 
-void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
-                            int block_type) {
-    if (FLAGS_rdma_memory_pool_user_specified_memory == false) {
+void* ExtendBlockPoolByUser(void* region_base, size_t region_size, int 
block_type) {
+    auto region_base_guard = butil::MakeScopeGuard([region_base]() {
+        free(region_base);
+    });
+
+    if (!FLAGS_rdma_memory_pool_user_specified_memory) {
         LOG_EVERY_SECOND(ERROR) << "User extend memory is disabled";
         return NULL;
     }
     if (reinterpret_cast<uintptr_t>(region_base) % 4096 != 0) {
         LOG_EVERY_SECOND(ERROR) << "region_base must be 4096 aligned";
+        errno = EINVAL;
         return NULL;
     }
 
-    uint64_t index = butil::fast_rand() % g_buckets;
-    BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
-    BAIDU_SCOPED_LOCK(g_info->extend_lock);
     region_size =
         region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
     region_size *= g_block_size[block_type] * g_buckets;
 
+    region_base_guard.dismiss();
+    BAIDU_SCOPED_LOCK(g_info->extend_lock);
     return ExtendBlockPoolImpl(region_base, region_size, block_type);
 }
 
@@ -316,6 +307,14 @@ bool InitBlockPool(RegisterCallback cb) {
                 return false;
             }
         }
+        g_info->expansion_list[i].resize(g_buckets, NULL);
+        if (g_info->expansion_list[i].size() != g_buckets) {
+            return false;
+        }
+        g_info->expansion_size[i].resize(g_buckets, 0);
+        if (g_info->expansion_size[i].size() != g_buckets) {
+            return false;
+        }
     }
 
     g_dump_mutex = new butil::Mutex;
@@ -332,66 +331,74 @@ bool InitBlockPool(RegisterCallback cb) {
     return false;
 }
 
+static void MoveExpansionList2EmptyIdleList(int block_type, size_t index) {
+    CHECK(NULL == g_info->idle_list[block_type][index]);
+
+    g_info->idle_list[block_type][index] = 
g_info->expansion_list[block_type][index];
+    g_info->idle_size[block_type][index] += 
g_info->expansion_size[block_type][index];
+    g_info->expansion_list[block_type][index] = NULL;
+    g_info->expansion_size[block_type][index] = 0;
+}
+
 static void* AllocBlockFrom(int block_type) {
     bool locked = false;
     if (BAIDU_UNLIKELY(g_dump_enable)) {
         g_dump_mutex->lock();
         locked = true;
     }
+    BUTIL_SCOPE_EXIT {
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+    };
+
     void* ptr = NULL;
-    if (block_type == 0 && tls_idle_list != NULL){
+    if (0 == block_type && NULL != tls_idle_list) {
         CHECK(tls_idle_num > 0);
         IdleNode* n = tls_idle_list;
         tls_idle_list = n->next;
         ptr = n->start;
         butil::return_object<IdleNode>(n);
         tls_idle_num--;
-        if (locked) {
-            g_dump_mutex->unlock();
-        }
         return ptr;
     }
 
-    uint64_t index = butil::fast_rand() % g_buckets;
+    size_t index = butil::fast_rand() % g_buckets;
     BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
     IdleNode* node = g_info->idle_list[block_type][index];
-    if (!node) {
+    if (NULL == node) {
         BAIDU_SCOPED_LOCK(g_info->extend_lock);
         node = g_info->idle_list[block_type][index];
-        if (!node) {
-            // There is no block left, extend a new region
-            if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,
-                                 block_type)) {
+        if (NULL == node && NULL != g_info->expansion_list[block_type][index]) 
{
+            MoveExpansionList2EmptyIdleList(block_type, index);
+            node = g_info->idle_list[block_type][index];
+        }
+        if (NULL == node) {
+            // There is no block left, extend a new region.
+            if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb, 
block_type)) {
                 LOG_EVERY_SECOND(ERROR) << "Fail to extend new region. "
                                         << "You can set the size of memory 
pool larger. "
                                         << "Refer to the help message of these 
flags: "
                                         << "rdma_memory_pool_initial_size_mb, "
                                         << "rdma_memory_pool_increase_size_mb, 
"
                                         << "rdma_memory_pool_max_regions.";
-                if (locked) {
-                    g_dump_mutex->unlock();
-                }
                 return NULL;
             }
+            MoveExpansionList2EmptyIdleList(block_type, index);
             node = g_info->idle_list[block_type][index];
         }
     }
-    if (node) {
-        ptr = node->start;
-        if (node->len > g_block_size[block_type]) {
-            node->start = (char*)node->start + g_block_size[block_type];
-            node->len -= g_block_size[block_type];
-        } else {
-            g_info->idle_list[block_type][index] = node->next;
-            butil::return_object<IdleNode>(node);
-        }
-        g_info->idle_size[block_type][index] -= g_block_size[block_type];
+    CHECK(NULL != node);
+
+    ptr = node->start;
+    if (node->len > g_block_size[block_type]) {
+        node->start = (char*)node->start + g_block_size[block_type];
+        node->len -= g_block_size[block_type];
     } else {
-        if (locked) {
-            g_dump_mutex->unlock();
-        }
-        return NULL;
+        g_info->idle_list[block_type][index] = node->next;
+        butil::return_object<IdleNode>(node);
     }
+    g_info->idle_size[block_type][index] -= g_block_size[block_type];
 
     // Move more blocks from global list to tls list
     if (block_type == 0) {
@@ -417,9 +424,6 @@ static void* AllocBlockFrom(int block_type) {
         }
     }
 
-    if (locked) {
-        g_dump_mutex->unlock();
-    }
     return ptr;
 }
 
@@ -482,6 +486,12 @@ int DeallocBlock(void* buf) {
         g_dump_mutex->lock();
         locked = true;
     }
+    BUTIL_SCOPE_EXIT {
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+    };
+
     if (block_type == 0 && tls_idle_num < 
(uint32_t)FLAGS_rdma_memory_pool_tls_cache_num) {
         if (!tls_inited) {
             tls_inited = true;
@@ -494,9 +504,6 @@ int DeallocBlock(void* buf) {
         tls_idle_num++;
         node->next = tls_idle_list;
         tls_idle_list = node;
-        if (locked) {
-            g_dump_mutex->unlock();
-        }
         return 0;
     }
 
@@ -527,9 +534,6 @@ int DeallocBlock(void* buf) {
         g_info->idle_list[block_type][index] = node;
         g_info->idle_size[block_type][index] += node->len;
     }
-    if (locked) {
-        g_dump_mutex->unlock();
-    }
     return 0;
 }
 
@@ -557,7 +561,8 @@ void DumpMemoryPoolInfo(std::ostream& os) {
     for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
         os << "\tFor block size " << GetBlockSize(i) << ":\n";
         for (size_t j = 0; j < g_buckets; ++j) {
-            os << "\t\tBucket " << j << ": " << g_info->idle_size[i][j] << 
"\n";
+            os << "\t\tBucket " << j << ": {" << g_info->idle_size[i][j]
+               << ", " << g_info->expansion_size[i][j] << "}\n";
         }
     }
     os << "Thread Local Cache Info:\n";
diff --git a/src/brpc/rdma/block_pool.h b/src/brpc/rdma/block_pool.h
index 00a31082..f9018e5e 100644
--- a/src/brpc/rdma/block_pool.h
+++ b/src/brpc/rdma/block_pool.h
@@ -80,8 +80,7 @@ bool InitBlockPool(RegisterCallback cb);
 // FLAGS_rdma_memory_pool_user_specified_memory is true, user is  
responsibility
 // of extending memory blocks , this ensuring flexibility for advanced use
 // cases.
-void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
-                            int block_type);
+void* ExtendBlockPoolByUser(void* region_base, size_t region_size, int 
block_type);
 
 // Allocate a buf with length at least @a size (require: size>0)
 // Return the address allocated, NULL if failed and errno is set.
diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h
index 7d72a560..377819b5 100644
--- a/src/butil/memory/scope_guard.h
+++ b/src/butil/memory/scope_guard.h
@@ -104,4 +104,6 @@ operator+(ScopeExitHelper, Callback&& callback) {
   auto BRPC_ANONYMOUS_VARIABLE(SCOPE_EXIT) =                \
       ::butil::internal::ScopeExitHelper() + [&]() noexcept
 
+#define BUTIL_SCOPE_EXIT BRPC_SCOPE_EXIT
+
 #endif // BUTIL_SCOPED_GUARD_H


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to