Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r939476496


##########
src/brpc/rdma/block_pool.cpp:
##########
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <errno.h>
+#include <stdlib.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/fast_rand.h"
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/thread_local.h"
+#include "bthread/bthread.h"
+#include "brpc/rdma/block_pool.h"
+
+
+namespace brpc {
+namespace rdma {
+
+DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
+             "Initial size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
+             "Increased size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
+DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in 
tls");
+
+// This callback is used when extending a new region
+// Generally, it is a memory region register call
+typedef uint32_t (*Callback)(void*, size_t);
+static Callback g_cb = NULL;
+
+// Number of bytes in 1MB
+static const size_t BYTES_IN_MB = 1048576;
+
+static const int BLOCK_DEFAULT = 0; // 8KB
+static const int BLOCK_LARGE = 1;  // 64KB
+static const int BLOCK_HUGE = 2;  // 2MB
+static const int BLOCK_SIZE_COUNT = 3;
+static size_t g_block_size[BLOCK_SIZE_COUNT] = { 8192, 65536, 2 * BYTES_IN_MB 
};
+
+struct IdleNode {
+    void* start;
+    size_t len;
+    IdleNode* next;
+};
+
+struct Region {
+    Region() { start = 0; }
+    uintptr_t start;
+    size_t size;
+    uint32_t block_type;
+    uint32_t id;  // lkey
+};
+
+static const int32_t RDMA_MEMORY_POOL_MIN_REGIONS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_REGIONS = 16;
+static Region g_regions[RDMA_MEMORY_POOL_MAX_REGIONS];
+static int g_region_num = 0;
+
+static const int32_t RDMA_MEMORY_POOL_MIN_SIZE = 32;  // 16MB
+static const int32_t RDMA_MEMORY_POOL_MAX_SIZE = 1048576;  // 1TB
+
+static const int32_t RDMA_MEMORY_POOL_MIN_BUCKETS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_BUCKETS = 16;
+static size_t g_buckets = 1;
+
+static bool g_dump_enable = false;
+static butil::Mutex* g_dump_mutex = NULL;
+
+// Only for default block size
+static __thread IdleNode* tls_idle_list = NULL;
+static __thread size_t tls_idle_num = 0;
+static __thread bool tls_inited = false;
+static butil::Mutex* g_tls_info_mutex = NULL;
+static size_t g_tls_info_cnt = 0;
+static size_t* g_tls_info[1024];
+
+// For each block size, there are some buckets of idle list to reduce race.
+struct GlobalInfo {
+    std::vector<IdleNode*> idle_list[BLOCK_SIZE_COUNT];
+    std::vector<butil::Mutex*> lock[BLOCK_SIZE_COUNT];
+    std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
+    butil::Mutex extend_lock;
+};
+static GlobalInfo* g_info = NULL;
+
+static inline Region* GetRegion(const void* buf) {
+    if (!buf) {
+        errno = EINVAL;
+        return NULL;
+    }
+    Region* r = NULL;
+    uintptr_t addr = (uintptr_t)buf;
+    for (int i = 0; i < FLAGS_rdma_memory_pool_max_regions; ++i) {
+        if (g_regions[i].start == 0) {
+            break;
+        }
+        if (addr >= g_regions[i].start &&
+            addr < g_regions[i].start + g_regions[i].size) {
+            r = &g_regions[i];
+            break;
+        }
+    }
+    return r;
+}
+
+uint32_t GetRegionId(const void* buf) {
+    Region* r = GetRegion(buf);
+    if (!r) {
+        return 0;
+    }
+    return r->id;
+}
+
+// Extend the block pool with a new region (with different region ID)
+static void* ExtendBlockPool(size_t region_size, int block_type) {
+    if (region_size < 1) {
+        errno = EINVAL;
+        return NULL;
+    }
+
+    if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
+        LOG(INFO) << "Memory pool reaches max regions";
+        errno = ENOMEM;
+        return NULL;
+    }
+
+    // Regularize region size
+    region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / 
g_buckets;
+    region_size *= g_block_size[block_type] * g_buckets;
+
+    LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << 
"MB";
+
+    void* region_base = NULL;
+    if (posix_memalign(&region_base, 4096, region_size) != 0) {
+        PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+        return NULL;
+    }
+
+    uint32_t id = g_cb(region_base, region_size);
+    if (id == 0) {
+        free(region_base);
+        return NULL;
+    }
+
+    IdleNode* node[g_buckets];
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i] = butil::get_object<IdleNode>();
+        if (!node[i]) {
+            PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+            for (size_t j = 0; j < i; ++j) {
+                butil::return_object<IdleNode>(node[j]);
+            }
+            free(region_base);
+            return NULL;
+        }
+    }
+ 
+    Region* region = &g_regions[g_region_num++];
+    region->start = (uintptr_t)region_base;
+    region->size = region_size;
+    region->id = id;
+    region->block_type = block_type;
+
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i]->start = (void*)(region->start + i * (region_size / 
g_buckets));
+        node[i]->len = region_size / g_buckets;
+        node[i]->next = NULL;
+        g_info->idle_list[block_type][i] = node[i];
+        g_info->idle_size[block_type][i] += node[i]->len;
+    }
+
+    return region_base;
+}
+
+void* InitBlockPool(Callback cb) {
+    if (!cb) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (g_cb) {
+        LOG(WARNING) << "Do not initialize block pool repeatedly";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_cb = cb;
+    if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS ||
+        FLAGS_rdma_memory_pool_max_regions > RDMA_MEMORY_POOL_MAX_REGIONS) {
+        LOG(WARNING) << "rdma_memory_pool_max_regions("
+                     << FLAGS_rdma_memory_pool_max_regions << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_REGIONS << ","
+                     << RDMA_MEMORY_POOL_MAX_REGIONS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_initial_size_mb("
+                     << FLAGS_rdma_memory_pool_initial_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_increase_size_mb("
+                     << FLAGS_rdma_memory_pool_increase_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS ||
+        FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) {
+        LOG(WARNING) << "rdma_memory_pool_buckets("
+                     << FLAGS_rdma_memory_pool_buckets << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_BUCKETS << ","
+                     << RDMA_MEMORY_POOL_MAX_BUCKETS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_buckets = FLAGS_rdma_memory_pool_buckets;
+
+    g_info = new (std::nothrow) GlobalInfo;
+    if (!g_info) {
+        return NULL;
+    }
+
+    for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
+        g_info->idle_list[i].resize(g_buckets, NULL);
+        if (g_info->idle_list[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->lock[i].resize(g_buckets, NULL);
+        if (g_info->lock[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->idle_size[i].resize(g_buckets, 0);
+        if (g_info->idle_size[i].size() != g_buckets) {
+            return NULL;
+        }
+        for (size_t j = 0; j < g_buckets; ++j) {
+            g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
+            if (!g_info->lock[i][j]) {
+                return NULL;
+            }
+        }
+    }
+
+    g_dump_mutex = new butil::Mutex;
+    g_tls_info_mutex = new butil::Mutex;
+
+    return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
+                           BLOCK_DEFAULT);
+}
+
+static void* AllocBlockFrom(int block_type) {
+    bool locked = false;
+    if (BAIDU_UNLIKELY(g_dump_enable)) {
+        g_dump_mutex->lock();
+        locked = true;
+    }
+    void* ptr = NULL;
+    if (block_type == 0 && tls_idle_list != NULL){
+        CHECK(tls_idle_num > 0);
+        IdleNode* n = tls_idle_list;
+        tls_idle_list = n->next;
+        ptr = n->start;
+        butil::return_object<IdleNode>(n);
+        tls_idle_num--;
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return ptr;
+    }
+
+    uint64_t index = butil::fast_rand() % g_buckets;
+    BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
+    IdleNode* node = g_info->idle_list[block_type][index];
+    if (!node) {
+        BAIDU_SCOPED_LOCK(g_info->extend_lock);
+        node = g_info->idle_list[block_type][index];
+        if (!node) {
+            // There is no block left, extend a new region
+            if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,
+                                 block_type)) {
+                LOG_EVERY_SECOND(ERROR) << "Fail to extend new region. "
+                                        << "You can set the size of memory 
pool larger. "
+                                        << "Refer to the help message of these 
flags: "
+                                        << "rdma_memory_pool_initial_size_mb, "
+                                        << "rdma_memory_pool_increase_size_mb, 
"
+                                        << "rdma_memory_pool_max_regions.";
+                if (locked) {
+                    g_dump_mutex->unlock();
+                }
+                return NULL;
+            }
+            node = g_info->idle_list[block_type][index];
+        }
+    }
+    if (node) {
+        ptr = node->start;
+        if (node->len > g_block_size[block_type]) {
+            node->start = (char*)node->start + g_block_size[block_type];
+            node->len -= g_block_size[block_type];
+        } else {
+            g_info->idle_list[block_type][index] = node->next;
+            butil::return_object<IdleNode>(node);
+        }
+        g_info->idle_size[block_type][index] -= g_block_size[block_type];
+    } else {
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return NULL;
+    }
+
+    // Move more blocks from global list to tls list
+    if (block_type == 0) {
+        node = g_info->idle_list[0][index];
+        tls_idle_list = node;

Review Comment:
   这个block pool是为了做block分配前的thread local。iobuf里面的tls是分配出来以后的thread 
local。实际上iobuf里面不少操作并没有用到里面的tls cache。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to