Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r939476215


##########
src/brpc/rdma/block_pool.cpp:
##########
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <errno.h>
+#include <stdlib.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/fast_rand.h"
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/thread_local.h"
+#include "bthread/bthread.h"
+#include "brpc/rdma/block_pool.h"
+
+
+namespace brpc {
+namespace rdma {
+
+DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
+             "Initial size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
+             "Increased size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
+DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in 
tls");
+
+// This callback is used when extending a new region
+// Generally, it is a memory region register call
+typedef uint32_t (*Callback)(void*, size_t);
+static Callback g_cb = NULL;
+
+// Number of bytes in 1MB
+static const size_t BYTES_IN_MB = 1048576;
+
+static const int BLOCK_DEFAULT = 0; // 8KB
+static const int BLOCK_LARGE = 1;  // 64KB
+static const int BLOCK_HUGE = 2;  // 2MB
+static const int BLOCK_SIZE_COUNT = 3;
+static size_t g_block_size[BLOCK_SIZE_COUNT] = { 8192, 65536, 2 * BYTES_IN_MB 
};
+
+struct IdleNode {
+    void* start;
+    size_t len;
+    IdleNode* next;
+};
+
+struct Region {
+    Region() { start = 0; }
+    uintptr_t start;
+    size_t size;
+    uint32_t block_type;
+    uint32_t id;  // lkey
+};
+
+static const int32_t RDMA_MEMORY_POOL_MIN_REGIONS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_REGIONS = 16;
+static Region g_regions[RDMA_MEMORY_POOL_MAX_REGIONS];
+static int g_region_num = 0;
+
+static const int32_t RDMA_MEMORY_POOL_MIN_SIZE = 32;  // 16MB
+static const int32_t RDMA_MEMORY_POOL_MAX_SIZE = 1048576;  // 1TB
+
+static const int32_t RDMA_MEMORY_POOL_MIN_BUCKETS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_BUCKETS = 16;
+static size_t g_buckets = 1;
+
+static bool g_dump_enable = false;
+static butil::Mutex* g_dump_mutex = NULL;
+
+// Only for default block size
+static __thread IdleNode* tls_idle_list = NULL;
+static __thread size_t tls_idle_num = 0;
+static __thread bool tls_inited = false;
+static butil::Mutex* g_tls_info_mutex = NULL;
+static size_t g_tls_info_cnt = 0;
+static size_t* g_tls_info[1024];
+
+// For each block size, there are some buckets of idle list to reduce race.
+struct GlobalInfo {
+    std::vector<IdleNode*> idle_list[BLOCK_SIZE_COUNT];
+    std::vector<butil::Mutex*> lock[BLOCK_SIZE_COUNT];
+    std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
+    butil::Mutex extend_lock;
+};
+static GlobalInfo* g_info = NULL;
+
+static inline Region* GetRegion(const void* buf) {
+    if (!buf) {
+        errno = EINVAL;
+        return NULL;
+    }
+    Region* r = NULL;
+    uintptr_t addr = (uintptr_t)buf;
+    for (int i = 0; i < FLAGS_rdma_memory_pool_max_regions; ++i) {
+        if (g_regions[i].start == 0) {
+            break;
+        }
+        if (addr >= g_regions[i].start &&
+            addr < g_regions[i].start + g_regions[i].size) {
+            r = &g_regions[i];
+            break;
+        }
+    }
+    return r;
+}
+
+uint32_t GetRegionId(const void* buf) {
+    Region* r = GetRegion(buf);
+    if (!r) {
+        return 0;
+    }
+    return r->id;
+}
+
+// Extend the block pool with a new region (with different region ID)
+static void* ExtendBlockPool(size_t region_size, int block_type) {
+    if (region_size < 1) {
+        errno = EINVAL;
+        return NULL;
+    }
+
+    if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
+        LOG(INFO) << "Memory pool reaches max regions";
+        errno = ENOMEM;
+        return NULL;
+    }
+
+    // Regularize region size
+    region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / 
g_buckets;
+    region_size *= g_block_size[block_type] * g_buckets;
+
+    LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << 
"MB";
+
+    void* region_base = NULL;
+    if (posix_memalign(&region_base, 4096, region_size) != 0) {
+        PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+        return NULL;
+    }
+
+    uint32_t id = g_cb(region_base, region_size);
+    if (id == 0) {
+        free(region_base);
+        return NULL;
+    }
+
+    IdleNode* node[g_buckets];
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i] = butil::get_object<IdleNode>();
+        if (!node[i]) {
+            PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+            for (size_t j = 0; j < i; ++j) {
+                butil::return_object<IdleNode>(node[j]);
+            }
+            free(region_base);
+            return NULL;
+        }
+    }
+ 
+    Region* region = &g_regions[g_region_num++];
+    region->start = (uintptr_t)region_base;
+    region->size = region_size;
+    region->id = id;
+    region->block_type = block_type;
+
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i]->start = (void*)(region->start + i * (region_size / 
g_buckets));
+        node[i]->len = region_size / g_buckets;
+        node[i]->next = NULL;
+        g_info->idle_list[block_type][i] = node[i];
+        g_info->idle_size[block_type][i] += node[i]->len;
+    }
+
+    return region_base;
+}
+
+void* InitBlockPool(Callback cb) {
+    if (!cb) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (g_cb) {
+        LOG(WARNING) << "Do not initialize block pool repeatedly";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_cb = cb;
+    if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS ||
+        FLAGS_rdma_memory_pool_max_regions > RDMA_MEMORY_POOL_MAX_REGIONS) {
+        LOG(WARNING) << "rdma_memory_pool_max_regions("
+                     << FLAGS_rdma_memory_pool_max_regions << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_REGIONS << ","
+                     << RDMA_MEMORY_POOL_MAX_REGIONS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_initial_size_mb("
+                     << FLAGS_rdma_memory_pool_initial_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_increase_size_mb("
+                     << FLAGS_rdma_memory_pool_increase_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS ||
+        FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) {
+        LOG(WARNING) << "rdma_memory_pool_buckets("
+                     << FLAGS_rdma_memory_pool_buckets << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_BUCKETS << ","
+                     << RDMA_MEMORY_POOL_MAX_BUCKETS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_buckets = FLAGS_rdma_memory_pool_buckets;
+
+    g_info = new (std::nothrow) GlobalInfo;
+    if (!g_info) {
+        return NULL;
+    }
+
+    for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
+        g_info->idle_list[i].resize(g_buckets, NULL);
+        if (g_info->idle_list[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->lock[i].resize(g_buckets, NULL);
+        if (g_info->lock[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->idle_size[i].resize(g_buckets, 0);
+        if (g_info->idle_size[i].size() != g_buckets) {
+            return NULL;
+        }
+        for (size_t j = 0; j < g_buckets; ++j) {
+            g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
+            if (!g_info->lock[i][j]) {
+                return NULL;
+            }
+        }
+    }
+
+    g_dump_mutex = new butil::Mutex;
+    g_tls_info_mutex = new butil::Mutex;
+
+    return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
+                           BLOCK_DEFAULT);
+}
+
+static void* AllocBlockFrom(int block_type) {
+    bool locked = false;
+    if (BAIDU_UNLIKELY(g_dump_enable)) {
+        g_dump_mutex->lock();
+        locked = true;
+    }
+    void* ptr = NULL;
+    if (block_type == 0 && tls_idle_list != NULL){
+        CHECK(tls_idle_num > 0);
+        IdleNode* n = tls_idle_list;
+        tls_idle_list = n->next;
+        ptr = n->start;
+        butil::return_object<IdleNode>(n);
+        tls_idle_num--;
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return ptr;
+    }
+
+    uint64_t index = butil::fast_rand() % g_buckets;
+    BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
+    IdleNode* node = g_info->idle_list[block_type][index];
+    if (!node) {
+        BAIDU_SCOPED_LOCK(g_info->extend_lock);
+        node = g_info->idle_list[block_type][index];
+        if (!node) {
+            // There is no block left, extend a new region
+            if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,

Review Comment:
   
设立多个bucket还是希望尽量减少相互的竞争。考虑到本身各个bucket随机使用,相对比较平均,一个bucket没有空间了,则大概率其他bucket也快没有了,这时不如直接extend出来。从实际使用建议上看,不建议依赖中途extend的机制,这个会引发Lkey的动态查找。所以暂时也没有太多优化这里。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to