Copilot commented on code in PR #3144:
URL: https://github.com/apache/brpc/pull/3144#discussion_r2507709241
##########
src/brpc/rdma/rdma_helper.cpp:
##########
@@ -93,7 +96,7 @@ DEFINE_string(rdma_device, "", "The name of the HCA device
used "
"(Empty means using the first active device)");
DEFINE_int32(rdma_port, 1, "The port number to use. For RoCE, it is always
1.");
DEFINE_int32(rdma_gid_index, -1, "The GID index to use. -1 means using the
last one.");
-
+DEFINE_int32(gpu_index, 0, "The GPU device index to use.In GDR, we suggest to
use the GPU that is connected to the same PCIe switch with rdma devices");
Review Comment:
Missing space after period in the flag description. Should be 'use. In GDR'
instead of 'use.In GDR'.
```suggestion
DEFINE_int32(gpu_index, 0, "The GPU device index to use. In GDR, we suggest
to use the GPU that is connected to the same PCIe switch with rdma devices");
```
##########
src/butil/iobuf.cpp:
##########
@@ -1317,6 +1373,59 @@ size_t IOBuf::copy_to(void* d, size_t n, size_t pos)
const {
return n - m;
}
+#if BRPC_WITH_GDR
+size_t IOBuf::copy_from_gpu(void* d, size_t n, size_t pos, bool to_gpu) const {
+ if (n == 0) {
+ return 0;
+ }
+ const size_t nref = _ref_num();
+ // Skip `pos' bytes. `offset' is the starting position in starting
BlockRef.
+ size_t offset = pos;
+ size_t i = 0;
+ for (; offset != 0 && i < nref; ++i) {
+ IOBuf::BlockRef const& r = _ref_at(i);
+ if (offset < (size_t)r.length) {
+ break;
+ }
+ offset -= r.length;
+ }
+
+ butil::gdr::GPUStreamPool* gpu_stream_pool =
butil::gdr::BlockPoolAllocators::singleton()->get_gpu_stream_pool();
+ struct timespec start, end;
+ clock_gettime(CLOCK_MONOTONIC, &start);
Review Comment:
The timing variables `start` and `end` are declared and used but the
computed `time_us` value is only for commented-out logging. If timing is not
actively needed, consider removing this instrumentation code.
##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -922,31 +933,57 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
wr.imm_data = butil::HostToNet32(imm);
wr.send_flags |= IBV_SEND_SOLICITED;
wr.send_flags |= IBV_SEND_SIGNALED;
+ wr.wr_id = FIXED_ACK_WR_ID;
ibv_send_wr* bad = NULL;
int err = ibv_post_send(_resource->qp, &wr, &bad);
if (err != 0) {
// We use other way to guarantee the Send Queue is not full.
// So we just consider this error as an unrecoverable error.
- LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
+ LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
+ << ", window_size:" << _window_size
+ << ", emote_recv_window: " << _remote_recv_window;
Review Comment:
Corrected spelling of 'emote_recv_window' to 'remote_recv_window'.
```suggestion
<< ", remote_recv_window: " << _remote_recv_window;
```
##########
src/butil/iobuf.cpp:
##########
@@ -722,6 +723,52 @@ size_t IOBuf::cutn(IOBuf* out, size_t n) {
return saved_n;
}
+#if BRPC_WITH_GDR
+size_t IOBuf::cutn_from_gpu(IOBuf* out, size_t n) {
+ if (n == 0) {
+ return 0;
+ }
+
+ butil::gdr::BlockPoolAllocator* host_allocator =
butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator();
+ bool alloc_from_host_alloc = (n <= host_allocator->get_block_size());
+ void* mem = NULL;
+ if (alloc_from_host_alloc) {
+ mem = host_allocator->AllocateRaw(n);
+ } else {
+ mem = malloc(n);
+ }
+
+ if (mem == NULL) {
+ return -1;
Review Comment:
The return type of `cutn_from_gpu` is `size_t` (unsigned), but the function
returns `-1` on error. This will be converted to a large positive value
(`SIZE_MAX`). Consider returning `0` to indicate failure, or document that
callers should check for this specific value.
```suggestion
return 0;
```
##########
src/butil/gpu/gpu_block_pool.h:
##########
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef BUTIL_GPU_GPU_BLOCK_POOL_H
+#define BUTIL_GPU_GPU_BLOCK_POOL_H
+
+#if BRPC_WITH_GDR
+
+#include <infiniband/verbs.h>
+#include <sys/types.h>
+#include <stdint.h>
+#include <linux/types.h>
+#include <string>
+#include <vector>
+#include <mutex>
+#include <infiniband/verbs.h>
+#include "butil/containers/hash_tables.h"
+#include "butil/logging.h"
+#include <cuda_runtime.h>
+#include "cuda.h"
+
+// #include "gdrapi.h"
+namespace butil {
+namespace gdr {
+
+static int gdr_block_size_kb = [](){
+ int ret = 64;
+ const char* env_var_val = getenv("GDR_BLOCK_SIZE_KB");
+ if (env_var_val == nullptr) {
+ return ret;
+ }
+ ret = std::stoi(env_var_val);
+
+ return ret;
+}();
+
+void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size);
+void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size);
+
+bool InitGPUBlockPool(int gpu_id, ibv_pd* pd);
+
+struct Region {
+ Region() { start = 0; aligned_start = 0;}
+ uintptr_t start;
+ uintptr_t aligned_start;
+
+ size_t size;
+ size_t aligned_size;
+ size_t blockCount;
+ struct ibv_mr *mr {nullptr};
+ uint32_t lkey;
+};
+
+struct BlockHeader {
+ BlockHeader() { addr = nullptr; next = nullptr;}
+ void Reset() { addr = nullptr; next = nullptr; }
+ void* addr;
+ BlockHeader* next;
+};
+
+class BlockPoolAllocator {
+ private:
+ int gpu_id;
+ bool on_gpu;
+ ibv_pd* pd {nullptr};
+
+ const size_t BLOCK_SIZE;
+ const size_t REGION_SIZE;
+
+ BlockHeader* freeList;
+ static constexpr size_t max_regions = 16;
+ int g_region_num {0};
+ Region g_regions[max_regions];
+ std::mutex poolMutex;
+
+ // 统计信息
+ size_t totalAllocated;
+ size_t totalDeallocated;
+ size_t peakUsage;
+
+ public:
+ explicit BlockPoolAllocator(int gpu_id,
+ bool on_gpu, ibv_pd* pd,
+ size_t blockSize, size_t regionSize);
+
+ ~BlockPoolAllocator();
+
+ void* AllocateRaw(size_t num_bytes);
+
+ void DeallocateRaw(void* ptr);
+
+ // 获取统计信息
+ void printStatistics() const;
+
+ int64_t getCurrentUsage() const {
+ return totalAllocated - totalDeallocated;
+ }
+
+ int64_t getTotalMemory() const {
+ return g_region_num * REGION_SIZE;
+ }
+
+ int64_t get_block_size() const {
+ return BLOCK_SIZE;
+ }
+
+ uint32_t get_lkey(const void* buf);
+
+ private:
+ Region* GetRegion(const void* buf);
+ void extendRegion();
+};
+
+class GPUStreamPool {
+public:
+ explicit GPUStreamPool(int gpu_id);
+
+ ~GPUStreamPool();
+
+ GPUStreamPool(const GPUStreamPool&) = delete;
+ GPUStreamPool& operator=(const GPUStreamPool&) = delete;
+
+ void fast_d2h(std::vector<void*>& src_list, std::vector<int64_t>&
length_list, void* dst);
+
+ void fast_d2d(std::vector<void*>& src_list, std::vector<int64_t>&
length_list, void* dst);
+
+ static constexpr int kMaxConcurrent = 32;
+private:
+ int gpu_id_ {-1};
+ std::atomic<int64_t> d2h_cnt_ {0};
+ std::atomic<int64_t> d2d_cnt_ {0};
+ std::mutex d2h_locks_[kMaxConcurrent];
+ std::mutex d2d_locks_[kMaxConcurrent];
+ std::mutex d2h_lb_lock_;
+ std::mutex d2d_lb_lock_;
+ std::vector<cudaStream_t> d2h_streams_;
+ std::vector<cudaStream_t> d2d_streams_;
+};
+
+class BlockPoolAllocators {
+public:
+ static BlockPoolAllocators* singleton();
+ BlockPoolAllocators() {}
+ virtual ~BlockPoolAllocators() {
+ CHECK_EQ(this, instance_);
+ instance_ = nullptr;
+ }
+
+ void init(int gpu_id, ibv_pd* pd) {
+ LOG(INFO) << "set GPU BlockPoolAllocator for " << gpu_id;
+ size_t region_size = 512LL * 1024 * 1024;
+ size_t block_size = gdr_block_size_kb * 1024;
+ gpu_mem_alloc = new BlockPoolAllocator(gpu_id, true, pd, block_size,
region_size);
+
+ region_size = 32LL * 1024 * 1024;
+ block_size = 512;
+ cpu_mem_alloc = new BlockPoolAllocator(gpu_id, false, pd, block_size,
region_size);
+
+ gpu_stream_pool = new GPUStreamPool(gpu_id);
+ }
+
+ BlockPoolAllocator* get_gpu_allocator() {
+ return gpu_mem_alloc;
+ }
+
+ BlockPoolAllocator* get_cpu_allocator() {
+ return cpu_mem_alloc;
+ }
+
+ GPUStreamPool* get_gpu_stream_pool() {
+ return gpu_stream_pool;
+ }
+
+public:
+ static BlockPoolAllocators* instance_;
+
+private:
+ BlockPoolAllocator* gpu_mem_alloc {nullptr};
+ BlockPoolAllocator* cpu_mem_alloc {nullptr};
+ GPUStreamPool* gpu_stream_pool {nullptr};
+};
+
Review Comment:
[nitpick] Missing blank line before closing namespace braces is inconsistent
with the opening namespace declaration style (lines 36-37). Consider adding
blank lines for consistency.
```suggestion
```
##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -883,6 +891,8 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from,
size_t ndata) {
// We use other way to guarantee the Send Queue is not full.
// So we just consider this error as an unrecoverable error.
LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
+ << ", window_size:" << _window_size
+ << ", emote_recv_window: " << _remote_recv_window
Review Comment:
Corrected spelling of 'emote_recv_window' to 'remote_recv_window'.
```suggestion
<< ", remote_recv_window: " << _remote_recv_window
```
##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -958,24 +995,10 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
}
}
if (wc.imm_data > 0) {
- // Clear sbuf here because we ignore event wakeup for send
completions
uint32_t acks = butil::NetToHost32(wc.imm_data);
- uint32_t num = acks;
- while (num > 0) {
- _sbuf[_sq_sent++].clear();
- if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
- _sq_sent = 0;
- }
- --num;
- }
- butil::subtle::MemoryBarrier();
-
- // Update window
uint32_t wnd_thresh = _local_window_capacity / 8;
- if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >=
wnd_thresh
- || acks >= wnd_thresh) {
- // Do not wake up writing thread right after _window_size > 0.
- // Otherwise the writing thread may switch to background too
quickly.
+ if(_remote_recv_window.fetch_add(acks,
butil::memory_order_release) >= wnd_thresh ||
Review Comment:
[nitpick] Missing space after 'if' keyword. Should be 'if (' instead of
'if(' per standard C++ formatting conventions.
```suggestion
if (_remote_recv_window.fetch_add(acks,
butil::memory_order_release) >= wnd_thresh ||
```
##########
src/butil/iobuf.cpp:
##########
@@ -1317,6 +1373,59 @@ size_t IOBuf::copy_to(void* d, size_t n, size_t pos)
const {
return n - m;
}
+#if BRPC_WITH_GDR
+size_t IOBuf::copy_from_gpu(void* d, size_t n, size_t pos, bool to_gpu) const {
+ if (n == 0) {
+ return 0;
+ }
+ const size_t nref = _ref_num();
+ // Skip `pos' bytes. `offset' is the starting position in starting
BlockRef.
+ size_t offset = pos;
+ size_t i = 0;
+ for (; offset != 0 && i < nref; ++i) {
+ IOBuf::BlockRef const& r = _ref_at(i);
+ if (offset < (size_t)r.length) {
+ break;
+ }
+ offset -= r.length;
+ }
+
+ butil::gdr::GPUStreamPool* gpu_stream_pool =
butil::gdr::BlockPoolAllocators::singleton()->get_gpu_stream_pool();
+ struct timespec start, end;
+ clock_gettime(CLOCK_MONOTONIC, &start);
+ size_t m = n;
+ std::vector<void*> src_list;
+ std::vector<int64_t> length_list;
+ for (; m != 0 && i < nref; ++i) {
+ IOBuf::BlockRef const& r = _ref_at(i);
+ const size_t nc = std::min(m, (size_t)r.length - offset);
+ void* gpu_src = r.block->data + r.offset + offset;
+ // cudaMemcpy(d, gpu_src, nc, cudaMemcpyDeviceToDevice);
+ src_list.push_back(gpu_src);
+ length_list.push_back(nc);
+ //cuMemcpyDtoH(d, (CUdeviceptr)(r.block->data + r.offset + offset),
nc);
+ // gdr_copy_from_mapping(allocator->mh(), d,
allocator->ToCPUPtr(gpu_src), nc);
+ offset = 0;
+ // d = (char*)d + nc;
+ m -= nc;
+ }
+ if (to_gpu) {
+ gpu_stream_pool->fast_d2d(src_list, length_list, d);
+ } else {
+ gpu_stream_pool->fast_d2h(src_list, length_list, d);
+ }
+ clock_gettime(CLOCK_MONOTONIC, &end);
+ double time_us = (end.tv_sec - start.tv_sec) * 1e6 + (end.tv_nsec -
start.tv_nsec) / 1e3;
+ size_t copied_bytes = n - m;
+
+ // LOG(INFO) << "GDRCopy: " << copied_bytes << " bytes, "
+ // << time_us << " us" << ", to_gpu " << to_gpu;
Review Comment:
The variables `time_us` and `copied_bytes` are computed but only used in
commented-out log statements (lines 1421-1422). Consider removing this unused
code or uncommenting the logging if it's needed for debugging.
```suggestion
```
##########
src/butil/gpu/gpu_block_pool.cpp:
##########
@@ -0,0 +1,450 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_GDR
+
+#include <iostream>
+#include <chrono>
+#include "butil/fast_rand.h"
+#include "gpu_block_pool.h"
+namespace butil {
+namespace gdr {
+
+#define CHECK_CUDA(call) \
+do { \
+ auto _sts = (call); \
+ if (_sts != cudaSuccess) { \
+ LOG(FATAL) << " cuda error:" \
+ << (cudaGetErrorString(_sts)) << std::string(" at ") \
+ << __FILE__ << ": " << __LINE__; \
+ } \
+} while (0);
+
+bool verify_same_context() {
+ static int original_device = -1;
+ static bool first_call = true;
+
+ int current_device;
+ cudaGetDevice(¤t_device);
+
+ if (first_call) {
+ original_device = current_device;
+ first_call = false;
+ return true;
+ }
+
+ return (current_device == original_device);
+}
+
+void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size) {
+ CHECK_CUDA(cudaSetDevice(gpu_id));
+ void *d_data;
+
+ LOG(INFO) << "try to alloc " << gpu_mem_size << " bytes from gpu " <<
gpu_id;
+
+ CHECK_CUDA(cudaMalloc(&d_data, gpu_mem_size));
+ cudaDeviceSynchronize();
+ return (void *)d_data;
+}
+
+void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size) {
+ CHECK_CUDA(cudaSetDevice(gpu_id));
+
+ LOG(INFO) << "try to alloc " << cpu_mem_size << " bytes from gpu " <<
gpu_id << "on host";
+
+ void* mem = NULL;
+
+ CHECK_CUDA(cudaMallocHost(&mem, cpu_mem_size));
+
+ cudaDeviceSynchronize();
+
+ return mem;
+}
+
+
+BlockPoolAllocators* BlockPoolAllocators::instance_ = nullptr;
+
+BlockPoolAllocators* BlockPoolAllocators::singleton() {
+ static std::mutex mutex;
+ if (instance_ == nullptr) {
+ std::lock_guard<std::mutex> l(mutex);
+ if(instance_ == nullptr) {
Review Comment:
[nitpick] Missing space after 'if' keyword. Should be 'if (' instead of
'if(' per standard C++ formatting conventions.
```suggestion
if (instance_ == nullptr) {
```
##########
src/butil/iobuf.cpp:
##########
@@ -722,6 +723,52 @@ size_t IOBuf::cutn(IOBuf* out, size_t n) {
return saved_n;
}
+#if BRPC_WITH_GDR
+size_t IOBuf::cutn_from_gpu(IOBuf* out, size_t n) {
+ if (n == 0) {
+ return 0;
+ }
+
+ butil::gdr::BlockPoolAllocator* host_allocator =
butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator();
+ bool alloc_from_host_alloc = (n <= host_allocator->get_block_size());
+ void* mem = NULL;
+ if (alloc_from_host_alloc) {
+ mem = host_allocator->AllocateRaw(n);
+ } else {
+ mem = malloc(n);
+ }
+
+ if (mem == NULL) {
+ return -1;
+ }
+ struct timespec start, end;
+ clock_gettime(CLOCK_MONOTONIC, &start);
+ size_t saved_n = copy_from_gpu(mem, n, 0, false);
+ if (saved_n > 0) {
+ if (alloc_from_host_alloc) {
+ auto deleter = [host_allocator](void* data) {
host_allocator->DeallocateRaw(data); };
+ out->append_user_data(mem, saved_n, deleter);
+ } else {
+ auto deleter = [](void* data) { free(data); };
+ out->append_user_data(mem, saved_n, deleter);
+ }
+ pop_front(saved_n);
+ } else {
+ if (alloc_from_host_alloc) {
+ host_allocator->DeallocateRaw(mem);
+ } else {
+ free(mem);
+ }
+ }
+ clock_gettime(CLOCK_MONOTONIC, &end);
+ double time_us = (end.tv_sec - start.tv_sec) * 1e6 + (end.tv_nsec -
start.tv_nsec) / 1e3;
Review Comment:
The variables `start`, `end`, and `time_us` are defined and computed but the
result is only used in a commented-out log statement (lines 766-767). Consider
removing this timing code if it's not being used, or uncomment the logging if
it's needed for debugging.
##########
src/butil/iobuf.cpp:
##########
@@ -722,6 +723,52 @@ size_t IOBuf::cutn(IOBuf* out, size_t n) {
return saved_n;
}
+#if BRPC_WITH_GDR
+size_t IOBuf::cutn_from_gpu(IOBuf* out, size_t n) {
+ if (n == 0) {
+ return 0;
+ }
+
+ butil::gdr::BlockPoolAllocator* host_allocator =
butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator();
+ bool alloc_from_host_alloc = (n <= host_allocator->get_block_size());
+ void* mem = NULL;
+ if (alloc_from_host_alloc) {
+ mem = host_allocator->AllocateRaw(n);
+ } else {
+ mem = malloc(n);
+ }
+
+ if (mem == NULL) {
+ return -1;
+ }
+ struct timespec start, end;
+ clock_gettime(CLOCK_MONOTONIC, &start);
Review Comment:
The timing variables `start` and `end` are declared and used but the
computed `time_us` value is only for commented-out logging. If timing is not
actively needed, consider removing this instrumentation code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]