wwbmmm commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r935457002


##########
src/brpc/server.cpp:
##########
@@ -701,6 +703,28 @@ static bool CreateConcurrencyLimiter(const 
AdaptiveMaxConcurrency& amc,
     return true;
 }
 
+#if BRPC_WITH_RDMA
+static bool OptionsAvailableOverRdma(const ServerOptions* opt) {
+    if (opt->rtmp_service) {
+        LOG(WARNING) << "RTMP is not supported by RDMA";
+        return false;
+    }
+    if (opt->has_ssl_options()) {
+        LOG(WARNING) << "SSL is not supported by RDMA";
+        return false;
+    }
+    if (opt->nshead_service) {

Review Comment:
   http_master_service是不是也不支持



##########
src/brpc/rdma/block_pool.cpp:
##########
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <errno.h>
+#include <stdlib.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/fast_rand.h"
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/thread_local.h"
+#include "bthread/bthread.h"
+#include "brpc/rdma/block_pool.h"
+
+
+namespace brpc {
+namespace rdma {
+
+DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
+             "Initial size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
+             "Increased size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
+DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in 
tls");
+
+// This callback is used when extending a new region
+// Generally, it is a memory region register call
+typedef uint32_t (*Callback)(void*, size_t);
+static Callback g_cb = NULL;
+
+// Number of bytes in 1MB
+static const size_t BYTES_IN_MB = 1048576;
+
+static const int BLOCK_DEFAULT = 0; // 8KB
+static const int BLOCK_LARGE = 1;  // 64KB
+static const int BLOCK_HUGE = 2;  // 2MB
+static const int BLOCK_SIZE_COUNT = 3;
+static size_t g_block_size[BLOCK_SIZE_COUNT] = { 8192, 65536, 2 * BYTES_IN_MB 
};
+
+struct IdleNode {
+    void* start;
+    size_t len;
+    IdleNode* next;
+};
+
+struct Region {
+    Region() { start = 0; }
+    uintptr_t start;
+    size_t size;
+    uint32_t block_type;
+    uint32_t id;  // lkey
+};
+
+static const int32_t RDMA_MEMORY_POOL_MIN_REGIONS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_REGIONS = 16;
+static Region g_regions[RDMA_MEMORY_POOL_MAX_REGIONS];
+static int g_region_num = 0;
+
+static const int32_t RDMA_MEMORY_POOL_MIN_SIZE = 32;  // 16MB
+static const int32_t RDMA_MEMORY_POOL_MAX_SIZE = 1048576;  // 1TB
+
+static const int32_t RDMA_MEMORY_POOL_MIN_BUCKETS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_BUCKETS = 16;
+static size_t g_buckets = 1;
+
+static bool g_dump_enable = false;
+static butil::Mutex* g_dump_mutex = NULL;
+
+// Only for default block size
+static __thread IdleNode* tls_idle_list = NULL;
+static __thread size_t tls_idle_num = 0;
+static __thread bool tls_inited = false;
+static butil::Mutex* g_tls_info_mutex = NULL;
+static size_t g_tls_info_cnt = 0;
+static size_t* g_tls_info[1024];
+
+// For each block size, there are some buckets of idle list to reduce race.
+struct GlobalInfo {
+    std::vector<IdleNode*> idle_list[BLOCK_SIZE_COUNT];
+    std::vector<butil::Mutex*> lock[BLOCK_SIZE_COUNT];
+    std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
+    butil::Mutex extend_lock;
+};
+static GlobalInfo* g_info = NULL;
+
+static inline Region* GetRegion(const void* buf) {
+    if (!buf) {
+        errno = EINVAL;
+        return NULL;
+    }
+    Region* r = NULL;
+    uintptr_t addr = (uintptr_t)buf;
+    for (int i = 0; i < FLAGS_rdma_memory_pool_max_regions; ++i) {
+        if (g_regions[i].start == 0) {
+            break;
+        }
+        if (addr >= g_regions[i].start &&
+            addr < g_regions[i].start + g_regions[i].size) {
+            r = &g_regions[i];
+            break;
+        }
+    }
+    return r;
+}
+
+uint32_t GetRegionId(const void* buf) {
+    Region* r = GetRegion(buf);
+    if (!r) {
+        return 0;
+    }
+    return r->id;
+}
+
+// Extend the block pool with a new region (with different region ID)
+static void* ExtendBlockPool(size_t region_size, int block_type) {
+    if (region_size < 1) {
+        errno = EINVAL;
+        return NULL;
+    }
+
+    if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
+        LOG(INFO) << "Memory pool reaches max regions";
+        errno = ENOMEM;
+        return NULL;
+    }
+
+    // Regularize region size
+    region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / 
g_buckets;
+    region_size *= g_block_size[block_type] * g_buckets;
+
+    LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << 
"MB";
+
+    void* region_base = NULL;
+    if (posix_memalign(&region_base, 4096, region_size) != 0) {
+        PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+        return NULL;
+    }
+
+    uint32_t id = g_cb(region_base, region_size);
+    if (id == 0) {
+        free(region_base);
+        return NULL;
+    }
+
+    IdleNode* node[g_buckets];
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i] = butil::get_object<IdleNode>();
+        if (!node[i]) {
+            PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+            for (size_t j = 0; j < i; ++j) {
+                butil::return_object<IdleNode>(node[j]);
+            }
+            free(region_base);
+            return NULL;
+        }
+    }
+ 
+    Region* region = &g_regions[g_region_num++];
+    region->start = (uintptr_t)region_base;
+    region->size = region_size;
+    region->id = id;
+    region->block_type = block_type;
+
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i]->start = (void*)(region->start + i * (region_size / 
g_buckets));
+        node[i]->len = region_size / g_buckets;
+        node[i]->next = NULL;
+        g_info->idle_list[block_type][i] = node[i];
+        g_info->idle_size[block_type][i] += node[i]->len;
+    }
+
+    return region_base;
+}
+
+void* InitBlockPool(Callback cb) {
+    if (!cb) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (g_cb) {
+        LOG(WARNING) << "Do not initialize block pool repeatedly";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_cb = cb;
+    if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS ||
+        FLAGS_rdma_memory_pool_max_regions > RDMA_MEMORY_POOL_MAX_REGIONS) {
+        LOG(WARNING) << "rdma_memory_pool_max_regions("
+                     << FLAGS_rdma_memory_pool_max_regions << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_REGIONS << ","
+                     << RDMA_MEMORY_POOL_MAX_REGIONS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_initial_size_mb("
+                     << FLAGS_rdma_memory_pool_initial_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_increase_size_mb("
+                     << FLAGS_rdma_memory_pool_increase_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS ||
+        FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) {
+        LOG(WARNING) << "rdma_memory_pool_buckets("
+                     << FLAGS_rdma_memory_pool_buckets << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_BUCKETS << ","
+                     << RDMA_MEMORY_POOL_MAX_BUCKETS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_buckets = FLAGS_rdma_memory_pool_buckets;
+
+    g_info = new (std::nothrow) GlobalInfo;
+    if (!g_info) {
+        return NULL;
+    }
+
+    for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
+        g_info->idle_list[i].resize(g_buckets, NULL);
+        if (g_info->idle_list[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->lock[i].resize(g_buckets, NULL);
+        if (g_info->lock[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->idle_size[i].resize(g_buckets, 0);
+        if (g_info->idle_size[i].size() != g_buckets) {
+            return NULL;
+        }
+        for (size_t j = 0; j < g_buckets; ++j) {
+            g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
+            if (!g_info->lock[i][j]) {
+                return NULL;
+            }
+        }
+    }
+
+    g_dump_mutex = new butil::Mutex;
+    g_tls_info_mutex = new butil::Mutex;
+
+    return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
+                           BLOCK_DEFAULT);
+}
+
+static void* AllocBlockFrom(int block_type) {
+    bool locked = false;
+    if (BAIDU_UNLIKELY(g_dump_enable)) {
+        g_dump_mutex->lock();
+        locked = true;
+    }
+    void* ptr = NULL;
+    if (block_type == 0 && tls_idle_list != NULL){
+        CHECK(tls_idle_num > 0);
+        IdleNode* n = tls_idle_list;
+        tls_idle_list = n->next;
+        ptr = n->start;
+        butil::return_object<IdleNode>(n);
+        tls_idle_num--;
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return ptr;
+    }
+
+    uint64_t index = butil::fast_rand() % g_buckets;
+    BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
+    IdleNode* node = g_info->idle_list[block_type][index];
+    if (!node) {
+        BAIDU_SCOPED_LOCK(g_info->extend_lock);
+        node = g_info->idle_list[block_type][index];
+        if (!node) {
+            // There is no block left, extend a new region
+            if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,
+                                 block_type)) {
+                LOG_EVERY_SECOND(ERROR) << "Fail to extend new region. "
+                                        << "You can set the size of memory 
pool larger. "
+                                        << "Refer to the help message of these 
flags: "
+                                        << "rdma_memory_pool_initial_size_mb, "
+                                        << "rdma_memory_pool_increase_size_mb, 
"
+                                        << "rdma_memory_pool_max_regions.";
+                if (locked) {
+                    g_dump_mutex->unlock();
+                }
+                return NULL;
+            }
+            node = g_info->idle_list[block_type][index];
+        }
+    }
+    if (node) {
+        ptr = node->start;
+        if (node->len > g_block_size[block_type]) {
+            node->start = (char*)node->start + g_block_size[block_type];
+            node->len -= g_block_size[block_type];
+        } else {
+            g_info->idle_list[block_type][index] = node->next;
+            butil::return_object<IdleNode>(node);
+        }
+        g_info->idle_size[block_type][index] -= g_block_size[block_type];
+    } else {
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return NULL;
+    }
+
+    // Move more blocks from global list to tls list
+    if (block_type == 0) {
+        node = g_info->idle_list[0][index];
+        tls_idle_list = node;

Review Comment:
   iobuf内部就有一个tls的block缓存,这里的tls_idle_list有什么不一样吗?



##########
src/brpc/rdma/block_pool.cpp:
##########
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <errno.h>
+#include <stdlib.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/fast_rand.h"
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/thread_local.h"
+#include "bthread/bthread.h"
+#include "brpc/rdma/block_pool.h"
+
+
+namespace brpc {
+namespace rdma {
+
+DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
+             "Initial size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
+             "Increased size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
+DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in 
tls");
+
+// This callback is used when extending a new region
+// Generally, it is a memory region register call
+typedef uint32_t (*Callback)(void*, size_t);
+static Callback g_cb = NULL;
+
+// Number of bytes in 1MB
+static const size_t BYTES_IN_MB = 1048576;
+
+static const int BLOCK_DEFAULT = 0; // 8KB
+static const int BLOCK_LARGE = 1;  // 64KB
+static const int BLOCK_HUGE = 2;  // 2MB
+static const int BLOCK_SIZE_COUNT = 3;
+static size_t g_block_size[BLOCK_SIZE_COUNT] = { 8192, 65536, 2 * BYTES_IN_MB 
};
+
+struct IdleNode {
+    void* start;
+    size_t len;
+    IdleNode* next;
+};
+
+struct Region {
+    Region() { start = 0; }
+    uintptr_t start;
+    size_t size;
+    uint32_t block_type;
+    uint32_t id;  // lkey
+};
+
+static const int32_t RDMA_MEMORY_POOL_MIN_REGIONS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_REGIONS = 16;
+static Region g_regions[RDMA_MEMORY_POOL_MAX_REGIONS];
+static int g_region_num = 0;
+
+static const int32_t RDMA_MEMORY_POOL_MIN_SIZE = 32;  // 16MB
+static const int32_t RDMA_MEMORY_POOL_MAX_SIZE = 1048576;  // 1TB
+
+static const int32_t RDMA_MEMORY_POOL_MIN_BUCKETS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_BUCKETS = 16;
+static size_t g_buckets = 1;
+
+static bool g_dump_enable = false;
+static butil::Mutex* g_dump_mutex = NULL;
+
+// Only for default block size
+static __thread IdleNode* tls_idle_list = NULL;
+static __thread size_t tls_idle_num = 0;
+static __thread bool tls_inited = false;
+static butil::Mutex* g_tls_info_mutex = NULL;
+static size_t g_tls_info_cnt = 0;
+static size_t* g_tls_info[1024];
+
+// For each block size, there are some buckets of idle list to reduce race.
+struct GlobalInfo {
+    std::vector<IdleNode*> idle_list[BLOCK_SIZE_COUNT];
+    std::vector<butil::Mutex*> lock[BLOCK_SIZE_COUNT];
+    std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
+    butil::Mutex extend_lock;
+};
+static GlobalInfo* g_info = NULL;
+
+static inline Region* GetRegion(const void* buf) {
+    if (!buf) {
+        errno = EINVAL;
+        return NULL;
+    }
+    Region* r = NULL;
+    uintptr_t addr = (uintptr_t)buf;
+    for (int i = 0; i < FLAGS_rdma_memory_pool_max_regions; ++i) {
+        if (g_regions[i].start == 0) {
+            break;
+        }
+        if (addr >= g_regions[i].start &&
+            addr < g_regions[i].start + g_regions[i].size) {
+            r = &g_regions[i];
+            break;
+        }
+    }
+    return r;
+}
+
+uint32_t GetRegionId(const void* buf) {
+    Region* r = GetRegion(buf);
+    if (!r) {
+        return 0;
+    }
+    return r->id;
+}
+
+// Extend the block pool with a new region (with different region ID)
+static void* ExtendBlockPool(size_t region_size, int block_type) {
+    if (region_size < 1) {
+        errno = EINVAL;
+        return NULL;
+    }
+
+    if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
+        LOG(INFO) << "Memory pool reaches max regions";
+        errno = ENOMEM;
+        return NULL;
+    }
+
+    // Regularize region size
+    region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / 
g_buckets;
+    region_size *= g_block_size[block_type] * g_buckets;
+
+    LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << 
"MB";
+
+    void* region_base = NULL;
+    if (posix_memalign(&region_base, 4096, region_size) != 0) {
+        PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+        return NULL;
+    }
+
+    uint32_t id = g_cb(region_base, region_size);
+    if (id == 0) {
+        free(region_base);
+        return NULL;
+    }
+
+    IdleNode* node[g_buckets];
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i] = butil::get_object<IdleNode>();
+        if (!node[i]) {
+            PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+            for (size_t j = 0; j < i; ++j) {
+                butil::return_object<IdleNode>(node[j]);
+            }
+            free(region_base);
+            return NULL;
+        }
+    }
+ 
+    Region* region = &g_regions[g_region_num++];
+    region->start = (uintptr_t)region_base;
+    region->size = region_size;
+    region->id = id;
+    region->block_type = block_type;
+
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i]->start = (void*)(region->start + i * (region_size / 
g_buckets));
+        node[i]->len = region_size / g_buckets;
+        node[i]->next = NULL;
+        g_info->idle_list[block_type][i] = node[i];
+        g_info->idle_size[block_type][i] += node[i]->len;
+    }
+
+    return region_base;
+}
+
+void* InitBlockPool(Callback cb) {
+    if (!cb) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (g_cb) {
+        LOG(WARNING) << "Do not initialize block pool repeatedly";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_cb = cb;
+    if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS ||
+        FLAGS_rdma_memory_pool_max_regions > RDMA_MEMORY_POOL_MAX_REGIONS) {
+        LOG(WARNING) << "rdma_memory_pool_max_regions("
+                     << FLAGS_rdma_memory_pool_max_regions << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_REGIONS << ","
+                     << RDMA_MEMORY_POOL_MAX_REGIONS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_initial_size_mb("
+                     << FLAGS_rdma_memory_pool_initial_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_increase_size_mb("
+                     << FLAGS_rdma_memory_pool_increase_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS ||
+        FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) {
+        LOG(WARNING) << "rdma_memory_pool_buckets("
+                     << FLAGS_rdma_memory_pool_buckets << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_BUCKETS << ","
+                     << RDMA_MEMORY_POOL_MAX_BUCKETS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_buckets = FLAGS_rdma_memory_pool_buckets;
+
+    g_info = new (std::nothrow) GlobalInfo;
+    if (!g_info) {
+        return NULL;
+    }
+
+    for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
+        g_info->idle_list[i].resize(g_buckets, NULL);
+        if (g_info->idle_list[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->lock[i].resize(g_buckets, NULL);
+        if (g_info->lock[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->idle_size[i].resize(g_buckets, 0);
+        if (g_info->idle_size[i].size() != g_buckets) {
+            return NULL;
+        }
+        for (size_t j = 0; j < g_buckets; ++j) {
+            g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
+            if (!g_info->lock[i][j]) {
+                return NULL;
+            }
+        }
+    }
+
+    g_dump_mutex = new butil::Mutex;
+    g_tls_info_mutex = new butil::Mutex;
+
+    return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
+                           BLOCK_DEFAULT);
+}
+
+static void* AllocBlockFrom(int block_type) {
+    bool locked = false;
+    if (BAIDU_UNLIKELY(g_dump_enable)) {
+        g_dump_mutex->lock();
+        locked = true;
+    }
+    void* ptr = NULL;
+    if (block_type == 0 && tls_idle_list != NULL){
+        CHECK(tls_idle_num > 0);
+        IdleNode* n = tls_idle_list;
+        tls_idle_list = n->next;
+        ptr = n->start;
+        butil::return_object<IdleNode>(n);
+        tls_idle_num--;
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return ptr;
+    }
+
+    uint64_t index = butil::fast_rand() % g_buckets;
+    BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
+    IdleNode* node = g_info->idle_list[block_type][index];
+    if (!node) {
+        BAIDU_SCOPED_LOCK(g_info->extend_lock);
+        node = g_info->idle_list[block_type][index];
+        if (!node) {
+            // There is no block left, extend a new region
+            if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,

Review Comment:
   在当前bucket上找不到,为什么不换个bucket再找找,而是直接Extend?



##########
src/brpc/rdma/block_pool.cpp:
##########
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <errno.h>
+#include <stdlib.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/fast_rand.h"
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/thread_local.h"
+#include "bthread/bthread.h"
+#include "brpc/rdma/block_pool.h"
+
+
+namespace brpc {
+namespace rdma {
+
+DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
+             "Initial size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
+             "Increased size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
+DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in 
tls");
+
+// This callback is used when extending a new region
+// Generally, it is a memory region register call
+typedef uint32_t (*Callback)(void*, size_t);

Review Comment:
   这个是不是定义在头文件中比较合适?
   另外,这个命名不是很容易让人理解这个callback的作用



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, 
int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, 
ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive 
zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: 
"
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {

Review Comment:
   这个叫HelloMessage可能会让人误以为是测试代码



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, 
int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, 
ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive 
zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: 
"
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data);
+    void Deserialize(void* data);
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint16_t block_size;
+    uint16_t sq_size;
+    uint16_t rq_size;
+    uint16_t lid;
+    ibv_gid gid;
+    uint32_t qp_num;
+};
+
+void HelloMessage::Serialize(void* data) {
+    // Note serialization does include magic str
+    memcpy(data, MAGIC_STR, 4);

Review Comment:
   deserialize是不含这个MAGIC_STR的,所以serialize也不包含是不是更合理?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to