Copilot commented on code in PR #3196:
URL: https://github.com/apache/brpc/pull/3196#discussion_r2749143558


##########
example/benchmark_fb/test.brpc.fb.cpp:
##########
@@ -0,0 +1,72 @@
+// Generated by the BRPC C++ plugin.
+// If you make any local change, they will be lost.
+// source: test.fbs
+#include "test.brpc.fb.h"
+#include <iostream>
+
+namespace test {
+
+static brpc::flatbuffers::ServiceDescriptor* 
file_level_service_descriptors_my_2eproto[1] = {NULL};

Review Comment:
   Potential memory leak: The static array 
file_level_service_descriptors_my_2eproto[0] is allocated in 
parse_service_descriptors but never deallocated. This creates a permanent 
memory leak. Consider using a smart pointer or implementing proper cleanup, 
perhaps via an atexit handler or a static destructor pattern to ensure the 
ServiceDescriptor is properly deleted at program exit.
   ```suggestion
   static brpc::flatbuffers::ServiceDescriptor* 
file_level_service_descriptors_my_2eproto[1] = {NULL};
   
   // Ensure the statically allocated ServiceDescriptor is cleaned up at 
program exit.
   struct FileLevelServiceDescriptorsCleanup {
       ~FileLevelServiceDescriptorsCleanup() {
           if (file_level_service_descriptors_my_2eproto[0] != NULL) {
               delete file_level_service_descriptors_my_2eproto[0];
               file_level_service_descriptors_my_2eproto[0] = NULL;
           }
       }
   };
   
   static FileLevelServiceDescriptorsCleanup 
file_level_service_descriptors_cleanup;
   ```



##########
src/brpc/details/flatbuffers_impl.h:
##########
@@ -0,0 +1,297 @@
+#ifndef BRPC_FLATBUFFERS_IMPL_H_
+#define BRPC_FLATBUFFERS_IMPL_H_

Review Comment:
   Missing Apache License header. All source files in brpc should include the 
standard Apache License 2.0 header comment block at the top of the file, as 
seen in other files like http_message.h. This is required for proper licensing 
compliance.



##########
example/benchmark_fb/test.brpc.fb.cpp:
##########
@@ -0,0 +1,72 @@
+// Generated by the BRPC C++ plugin.
+// If you make any local change, they will be lost.
+// source: test.fbs
+#include "test.brpc.fb.h"
+#include <iostream>
+
+namespace test {
+
+static brpc::flatbuffers::ServiceDescriptor* 
file_level_service_descriptors_my_2eproto[1] = {NULL};
+BenchmarkService::~BenchmarkService() {}
+const brpc::flatbuffers::ServiceDescriptor* BenchmarkService::descriptor() {
+    if (file_level_service_descriptors_my_2eproto[0] == NULL) {
+        const brpc::flatbuffers::BrpcDescriptorTable desc_table = {
+        "test.", "BenchmarkService" , "Test"};
+        if (brpc::flatbuffers::parse_service_descriptors(desc_table, 
&file_level_service_descriptors_my_2eproto[0])) {
+            std::cout << "ERROR: " << "Fail to parse_service_descriptors" << 
std::endl;
+            return NULL;
+        }
+    }

Review Comment:
   Thread safety issue: The descriptor() method has a race condition. Multiple 
threads could simultaneously check if 
file_level_service_descriptors_my_2eproto[0] is NULL and call 
parse_service_descriptors concurrently, leading to memory leaks and undefined 
behavior. This should be protected with a mutex or use std::call_once for 
thread-safe lazy initialization. This is a code generation issue that should be 
addressed in the flatc code generator.
   ```suggestion
   #include <mutex>
   
   namespace test {
   
   static brpc::flatbuffers::ServiceDescriptor* 
file_level_service_descriptors_my_2eproto[1] = {NULL};
   static std::once_flag file_level_service_descriptors_my_2eproto_once_flag;
   BenchmarkService::~BenchmarkService() {}
   const brpc::flatbuffers::ServiceDescriptor* BenchmarkService::descriptor() {
       std::call_once(file_level_service_descriptors_my_2eproto_once_flag, []() 
{
           const brpc::flatbuffers::BrpcDescriptorTable desc_table = {
               "test.", "BenchmarkService" , "Test"};
           if (brpc::flatbuffers::parse_service_descriptors(
                   desc_table, &file_level_service_descriptors_my_2eproto[0])) {
               std::cout << "ERROR: " << "Fail to parse_service_descriptors" << 
std::endl;
           }
       });
   ```



##########
example/benchmark_fb/server.cpp:
##########
@@ -0,0 +1,74 @@
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include "test.brpc.fb.h"
+
+DEFINE_bool(echo_attachment, true, "Echo attachment as well");
+DEFINE_int32(port, 8080, "TCP Port of this server");
+DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
+             "read/write operations during the last `idle_timeout_s'");
+DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
+DEFINE_int32(internal_port, -1, "Only allow builtin services at this port");
+
+namespace test{
+class BenchmarkServiceImpl : public BenchmarkService {
+public:
+    BenchmarkServiceImpl() {}
+    ~BenchmarkServiceImpl() {}
+
+    void Test(google::protobuf::RpcController* controller,
+                const brpc::flatbuffers::Message* request_base,
+                brpc::flatbuffers::Message* response,
+                google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl =
+            static_cast<brpc::Controller*>(controller);
+        const test::BenchmarkRequest* request = 
request_base->GetRoot<test::BenchmarkRequest>();
+        // Set Response Message
+        brpc::flatbuffers::MessageBuilder mb_;
+        const char *req_str = request->message()->c_str();

Review Comment:
   Potential null pointer dereference: The request->message() call on line 29 
can return nullptr if the message field is not set in the FlatBuffers message. 
Calling c_str() on a nullptr will cause a crash. Add a null check before 
accessing the string, or handle the case where message is not present.
   ```suggestion
           const auto* msg = request->message();
           const char* req_str = msg ? msg->c_str() : "";
   ```



##########
example/benchmark_fb/server.cpp:
##########
@@ -0,0 +1,74 @@
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include "test.brpc.fb.h"
+
+DEFINE_bool(echo_attachment, true, "Echo attachment as well");
+DEFINE_int32(port, 8080, "TCP Port of this server");
+DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
+             "read/write operations during the last `idle_timeout_s'");
+DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
+DEFINE_int32(internal_port, -1, "Only allow builtin services at this port");
+
+namespace test{
+class BenchmarkServiceImpl : public BenchmarkService {
+public:
+    BenchmarkServiceImpl() {}
+    ~BenchmarkServiceImpl() {}
+
+    void Test(google::protobuf::RpcController* controller,
+                const brpc::flatbuffers::Message* request_base,
+                brpc::flatbuffers::Message* response,
+                google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl =
+            static_cast<brpc::Controller*>(controller);
+        const test::BenchmarkRequest* request = 
request_base->GetRoot<test::BenchmarkRequest>();
+        // Set Response Message
+        brpc::flatbuffers::MessageBuilder mb_;
+        const char *req_str = request->message()->c_str();
+        auto message = mb_.CreateString(req_str);
+        auto resp = test::CreateBenchmarkResponse(mb_, request->opcode(),
+                    request->echo_attachment(), request->attachment_size(),
+                    request->request_id(),request->reserved(), message);
+        mb_.Finish(resp);
+        *response = mb_.ReleaseMessage();
+        if (FLAGS_echo_attachment) {
+            cntl->response_attachment().append(cntl->request_attachment());
+        }
+    }
+};
+
+}
+
+int main(int argc, char* argv[]) {
+    GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
+
+    // Generally you only need one Server.
+    brpc::Server server;
+
+    // Instance of your service.
+    test::BenchmarkServiceImpl benchmark_service_impl;
+
+    if (server.AddService(&benchmark_service_impl, 
+                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+        LOG(ERROR) << "Fail to add service";
+        return -1;
+    }
+
+    // Start the server. 
+    brpc::ServerOptions options;
+    options.idle_timeout_sec = FLAGS_idle_timeout_s;
+    options.max_concurrency = FLAGS_max_concurrency;
+    options.internal_port = FLAGS_internal_port;
+
+    if (server.Start(FLAGS_port, &options) != 0) {
+        LOG(ERROR) << "Fail to start EchoServer";

Review Comment:
   Inconsistent error message: The error message says "Fail to start 
EchoServer" but this is BenchmarkServer, not EchoServer. The message should be 
updated to "Fail to start BenchmarkServer" or simply "Fail to start server" for 
consistency.
   ```suggestion
           LOG(ERROR) << "Fail to start server";
   ```



##########
src/brpc/details/flatbuffers_impl.cpp:
##########
@@ -0,0 +1,244 @@
+#include <vector>

Review Comment:
   Missing Apache License header. All source files in brpc should include the 
standard Apache License 2.0 header comment block at the top of the file. This 
is required for proper licensing compliance.



##########
src/brpc/details/flatbuffers_impl.h:
##########
@@ -0,0 +1,297 @@
+#ifndef BRPC_FLATBUFFERS_IMPL_H_
+#define BRPC_FLATBUFFERS_IMPL_H_
+
+#include <stdint.h>
+#include <stdio.h>
+#include <string>
+#include <flatbuffers/flatbuffers.h>
+#include "butil/iobuf.h"
+#include "butil/single_iobuf.h"
+#include "brpc/details/flatbuffers_common.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+class MessageBuilder;
+
+// Custom allocator for FlatBuffers that uses IOBuf as underlying storage.
+// This allocator manages memory allocation for FlatBuffers messages within
+// brpc's zero-copy buffer system, enabling efficient serialization and
+// deserialization without unnecessary memory copies.
+class SlabAllocator : public ::flatbuffers::Allocator {
+public:
+    SlabAllocator() {}

Review Comment:
   Uninitialized member variables: The SlabAllocator class has member variables 
_full_buf_head, _fb_begin_head, and _old_size_param that are not initialized in 
the default constructor. These should be initialized to safe default values 
(e.g., nullptr for pointers, 0 for size_t) to prevent undefined behavior if 
methods are called before allocate() is invoked.
   ```suggestion
       SlabAllocator()
           : _full_buf_head(nullptr)
           , _fb_begin_head(nullptr)
           , _old_size_param(0) {}
   ```



##########
src/brpc/details/flatbuffers_impl.cpp:
##########
@@ -0,0 +1,244 @@
+#include <vector>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <gflags/gflags.h>
+
+#include "butil/iobuf.h"
+#include "butil/logging.h"
+#include "butil/thread_local.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "brpc/details/flatbuffers_impl.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+#define METHOD_SPLIT " "
+#define PREFIX_SPLIT "."
+#define DEFAULT_RESERVE_SIZE 64     // Reserve space for rpc header and meta
+
+uint8_t *SlabAllocator::allocate(size_t size) {
+    _old_size_param = size;
+    size_t real_size = size + DEFAULT_RESERVE_SIZE;
+
+    _full_buf_head = (uint8_t *)_iobuf.allocate(real_size);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    return _fb_begin_head;
+}
+
+uint8_t *SlabAllocator::reallocate_downward(uint8_t *old_p,
+                                        size_t old_size,
+                                        size_t new_size,
+                                        size_t in_use_back,
+                                        size_t in_use_front) {
+    _old_size_param = new_size;
+    size_t new_real_size = new_size + DEFAULT_RESERVE_SIZE;
+
+    void* old_block = _iobuf.get_cur_block();
+    butil::SingleIOBuf::target_block_inc_ref(old_block);
+    _full_buf_head = (uint8_t *)_iobuf.reallocate_downward(new_real_size, 0, 
0);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    memcpy_downward(old_p, old_size, _fb_begin_head, new_size, in_use_back, 
in_use_front);
+    butil::SingleIOBuf::target_block_dec_ref(old_block);
+    return _fb_begin_head;
+}
+
+void SlabAllocator::memcpy_downward(uint8_t *old_p, size_t old_size, uint8_t 
*new_p,
+                    size_t new_size, size_t in_use_back,
+                    size_t in_use_front) {
+    memcpy(new_p + new_size - in_use_back, old_p + old_size - in_use_back,
+        in_use_back);
+    memcpy(new_p, old_p, in_use_front);
+}
+
+int ServiceDescriptor::init(const BrpcDescriptorTable& table) {
+    if (table.service_name == "" || table.prefix == "" ||
+                            table.method_name_list == "") {

Review Comment:
   String comparison issue: Comparing std::string objects with "" using == is 
correct but using empty() method is more idiomatic and clearer. Consider 
changing 'table.service_name == ""' to 'table.service_name.empty()' (and 
similarly for prefix and method_name_list) for better code clarity and style 
consistency.
   ```suggestion
       if (table.service_name.empty() || table.prefix.empty() ||
                               table.method_name_list.empty()) {
   ```



##########
src/brpc/details/flatbuffers_impl.h:
##########
@@ -0,0 +1,297 @@
+#ifndef BRPC_FLATBUFFERS_IMPL_H_
+#define BRPC_FLATBUFFERS_IMPL_H_
+
+#include <stdint.h>
+#include <stdio.h>
+#include <string>
+#include <flatbuffers/flatbuffers.h>
+#include "butil/iobuf.h"
+#include "butil/single_iobuf.h"
+#include "brpc/details/flatbuffers_common.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+class MessageBuilder;
+
+// Custom allocator for FlatBuffers that uses IOBuf as underlying storage.
+// This allocator manages memory allocation for FlatBuffers messages within
+// brpc's zero-copy buffer system, enabling efficient serialization and
+// deserialization without unnecessary memory copies.
+class SlabAllocator : public ::flatbuffers::Allocator {
+public:
+    SlabAllocator() {}
+
+    SlabAllocator(const SlabAllocator &other) = delete;
+
+    SlabAllocator &operator=(const SlabAllocator &other) = delete;
+
+    SlabAllocator(SlabAllocator &&other) {
+        // default-construct and swap idiom
+        swap(other);
+    }
+
+    SlabAllocator &operator=(SlabAllocator &&other) {
+    // move-construct and swap idiom
+        SlabAllocator temp(std::move(other));
+        swap(temp);
+        return *this;
+    }
+
+    void swap(SlabAllocator &other) {
+        _iobuf.swap(other._iobuf);
+    }
+
+    virtual ~SlabAllocator() {}
+    /*
+    * Allocate memory from the slab allocator.
+    * buffer struct: fb header + fb message
+    */
+    virtual uint8_t *allocate(size_t size);
+
+    virtual void deallocate(uint8_t *p, size_t size) override {
+        if (p == _fb_begin_head) {
+            _iobuf.deallocate((void*)_full_buf_head);
+        }
+    }
+
+    void deallocate(void *p) {
+        _iobuf.deallocate(p);
+    }
+
+    virtual uint8_t *reallocate_downward(uint8_t *old_p, size_t old_size,
+                                        size_t new_size, size_t in_use_back,
+                                        size_t in_use_front);
+protected:
+    void memcpy_downward(uint8_t *old_p, size_t old_size, uint8_t *new_p,
+                        size_t new_size, size_t in_use_back,
+                        size_t in_use_front);
+private:
+
+    butil::SingleIOBuf &get_iobuf() {
+        return _iobuf;
+    }
+    uint8_t *_full_buf_head;
+    uint8_t *_fb_begin_head;
+    size_t _old_size_param;
+    butil::SingleIOBuf _iobuf;
+    friend class MessageBuilder;
+};
+
+// SlabAllocatorMember is a hack to ensure that the MessageBuilder's
+// slab_allocator_ member is constructed before the FlatBufferBuilder, since
+// the allocator is used in the FlatBufferBuilder ctor.
+struct SlabAllocatorMember {
+  SlabAllocator slab_allocator_;
+};
+
+// Represents a FlatBuffers message in brpc's zero-copy buffer system.
+// This class wraps a FlatBuffers message stored in an IOBuf
+// The message is move-only and cannot be copied.
+class Message {
+public:
+    Message() : _meta_size(0), _msg_size(0) {}
+ 
+    Message &operator=(const Message &other) = delete;
+private:
+    Message(const butil::SingleIOBuf &iobuf,
+            uint32_t meta_size,
+            uint32_t msg_size);
+
+    Message(const butil::IOBuf::BlockRef &ref,
+                    uint32_t meta_size,
+                    uint32_t msg_size);
+    friend class MessageBuilder;
+public:
+    Message(Message &&other) = default;
+
+    Message(const Message &other) = delete; 
+
+    Message &operator=(Message &&other) = default;
+
+    void *mutable_data() const {
+        return (void *)const_cast<uint8_t *>(data());
+    }
+
+    const uint8_t *data() const {
+        const uint8_t *buf = (const uint8_t *)_iobuf.get_begin();

Review Comment:
   Missing null pointer check in data() method: The data() method calls 
_iobuf.get_begin() which can return NULL if the buffer is not initialized. When 
NULL is returned, adding _meta_size to NULL results in undefined behavior. 
Consider adding a null check or documenting the precondition that the buffer 
must be initialized before calling data().
   ```suggestion
           const uint8_t *buf = (const uint8_t *)_iobuf.get_begin();
           if (buf == NULL) {
               return NULL;
           }
   ```



##########
src/brpc/details/flatbuffers_impl.cpp:
##########
@@ -0,0 +1,244 @@
+#include <vector>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <gflags/gflags.h>
+
+#include "butil/iobuf.h"
+#include "butil/logging.h"
+#include "butil/thread_local.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "brpc/details/flatbuffers_impl.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+#define METHOD_SPLIT " "
+#define PREFIX_SPLIT "."
+#define DEFAULT_RESERVE_SIZE 64     // Reserve space for rpc header and meta
+
+uint8_t *SlabAllocator::allocate(size_t size) {
+    _old_size_param = size;
+    size_t real_size = size + DEFAULT_RESERVE_SIZE;
+
+    _full_buf_head = (uint8_t *)_iobuf.allocate(real_size);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    return _fb_begin_head;
+}
+
+uint8_t *SlabAllocator::reallocate_downward(uint8_t *old_p,
+                                        size_t old_size,
+                                        size_t new_size,
+                                        size_t in_use_back,
+                                        size_t in_use_front) {
+    _old_size_param = new_size;
+    size_t new_real_size = new_size + DEFAULT_RESERVE_SIZE;
+
+    void* old_block = _iobuf.get_cur_block();
+    butil::SingleIOBuf::target_block_inc_ref(old_block);
+    _full_buf_head = (uint8_t *)_iobuf.reallocate_downward(new_real_size, 0, 
0);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    memcpy_downward(old_p, old_size, _fb_begin_head, new_size, in_use_back, 
in_use_front);
+    butil::SingleIOBuf::target_block_dec_ref(old_block);
+    return _fb_begin_head;
+}
+
+void SlabAllocator::memcpy_downward(uint8_t *old_p, size_t old_size, uint8_t 
*new_p,
+                    size_t new_size, size_t in_use_back,
+                    size_t in_use_front) {
+    memcpy(new_p + new_size - in_use_back, old_p + old_size - in_use_back,
+        in_use_back);
+    memcpy(new_p, old_p, in_use_front);
+}
+
+int ServiceDescriptor::init(const BrpcDescriptorTable& table) {
+    if (table.service_name == "" || table.prefix == "" ||
+                            table.method_name_list == "") {
+        errno = EINVAL;
+        return -1;
+    }
+    std::vector<std::string> res;
+    std::string strs = table.method_name_list + METHOD_SPLIT;
+    size_t pos = strs.find(METHOD_SPLIT);
+    while(pos != strs.npos) {
+        std::string tmp = strs.substr(0, pos);
+        if (tmp != METHOD_SPLIT && tmp != "\n") {
+            res.push_back(tmp);
+        }
+        strs = strs.substr(pos + 1, strs.size());
+        pos = strs.find(METHOD_SPLIT);
+    }
+    method_count_ = res.size();
+    if (method_count_ <= 0) {
+        errno = EINVAL;
+        return -1;
+    }
+    name_ = table.service_name;
+    full_name_ = table.prefix + std::string(PREFIX_SPLIT) + name_;
+    butil::MurmurHash3_x86_32(full_name_.c_str(), full_name_.size(), 1, 
&index_);
+    methods_ = new MethodDescriptor*[method_count_];
+    if (!methods_) {
+        return -1;
+    }
+    memset(methods_, 0, method_count_ * sizeof(MethodDescriptor*));
+    for (int i = 0; i < method_count_; ++i) {
+        methods_[i] = new MethodDescriptor(table.prefix.c_str(), 
res[i].c_str(), this, i);
+        if (!methods_[i]) {
+            release_descriptor();
+            return -1;
+        }
+    }
+    return 0;
+}
+
+void ServiceDescriptor::release_descriptor() {
+    if (methods_ != NULL ) {
+        for (int i = 0; i < method_count_; ++i) {
+            if (methods_[i] != NULL) {
+                delete methods_[i];
+            }
+            methods_[i] = NULL;
+        }
+        delete methods_;
+        methods_ = NULL;
+    }
+}
+
+ServiceDescriptor::~ServiceDescriptor() {
+    release_descriptor();
+}
+
+const MethodDescriptor* ServiceDescriptor::method(int index) const {
+    if (index < 0 || index >= method_count_) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (!methods_) {
+        errno = EPERM;
+        return NULL;
+    }
+    return methods_[index];
+}
+
+MethodDescriptor::MethodDescriptor(const char* prefix,
+                            const char* name,
+                            const ServiceDescriptor* service,
+                            int index) {
+    name_ = name;
+    full_name_ = std::string(prefix) + std::string(PREFIX_SPLIT) + 
std::string(name);
+    service_ = service;
+    index_ = index;
+}
+
+Message MessageBuilder::ReleaseMessage() {
+    const uint8_t *msg_data = buf_.data();                  // pointer to msg
+    uint32_t msg_size = static_cast<uint32_t>(buf_.size());
+    butil::SingleIOBuf& iobuf = slab_allocator_.get_iobuf();
+    const uint8_t *buf_data = (const uint8_t *) iobuf.get_begin();
+    // Calculate offsets from the buffer start
+    // reserve the memory space for rpc header and meta
+    const uint8_t *data_with_rpc = msg_data - DEFAULT_RESERVE_SIZE;
+    uint32_t new_msg_size = msg_size;
+
+    new_msg_size += DEFAULT_RESERVE_SIZE;
+    uint32_t begin = data_with_rpc - buf_data;
+
+    const butil::IOBuf::BlockRef& ref = iobuf.get_cur_ref();
+    butil::IOBuf::BlockRef sub_ref = {ref.offset + begin, new_msg_size, 
ref.block};
+    Message msg(sub_ref, DEFAULT_RESERVE_SIZE, msg_size);
+    Reset();
+    return msg;
+}
+
+Message::Message(const butil::SingleIOBuf &iobuf,
+            uint32_t meta_size = 0,
+            uint32_t msg_size = 0)
+                    : _iobuf(iobuf)
+                    , _meta_size(meta_size)
+                    , _msg_size(msg_size) {
+    if (msg_size == 0) {
+        _msg_size = _iobuf.get_length() - meta_size;
+    }
+}
+
+Message::Message(const butil::IOBuf::BlockRef &ref,
+            uint32_t meta_size = 0,
+            uint32_t msg_size = 0)
+                    : _iobuf(ref)
+                    , _meta_size(meta_size)
+                    , _msg_size(msg_size) {
+    if (msg_size == 0) {
+        _msg_size = _iobuf.get_length() - meta_size;
+    }
+}
+
+bool Message::parse_msg_from_iobuf(const butil::IOBuf& buf, size_t msg_size, 
size_t meta_size) {
+    size_t buf_size = buf.size();
+    if (buf_size != (msg_size + meta_size)) {
+        return false;
+    }
+    if (!_iobuf.assign(buf, buf_size)) {
+        return false;
+    }
+    _meta_size = meta_size;
+    _msg_size = msg_size;
+
+    return true;
+}
+
+bool Message::append_msg_to_iobuf(butil::IOBuf& buf) {
+    _iobuf.append_to(&buf);
+    return true;
+}
+
+void *Message::reduce_meta_size_and_get_buf(uint32_t new_size) {
+    if (new_size < _meta_size) {
+        uint32_t off = _meta_size - new_size;
+        const butil::IOBuf::BlockRef& ref = _iobuf.get_cur_ref();
+        butil::IOBuf::BlockRef sub_ref = {ref.offset + off, ref.length - off, 
ref.block};
+        _iobuf = butil::SingleIOBuf(sub_ref);
+        _meta_size = new_size;
+        return const_cast<void*>(_iobuf.get_begin());
+    } else if (new_size == _meta_size) {
+        return const_cast<void*>(_iobuf.get_begin());
+    }
+    LOG(WARNING) << "You should change DEFAULT_RESERVE_SIZE, new_size=" << 
new_size << " > _meta_size=" << _meta_size;
+    return NULL;
+}
+
+int parse_service_descriptors(const BrpcDescriptorTable& descriptor_table,
+                    ServiceDescriptor** descriptor_out) {
+    if(descriptor_out == NULL) {
+        errno = EINVAL;
+        return -1;
+    }
+    ServiceDescriptor *out = new ServiceDescriptor;
+    if (!out) {
+        return -1;
+    }
+    int ret = out->init(descriptor_table);
+    if (ret < 0) {
+        delete out;
+        return -1;
+    }
+    *descriptor_out = out;
+    return 0;
+}
+
+bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& buf, 
size_t meta_remaind) {
+    if (!msg || msg_size == 0 || meta_remaind > msg_size) {
+        return false;
+    }
+    return msg->parse_msg_from_iobuf(buf, msg_size, meta_remaind);
+}
+
+bool SerializeFbToIOBUF(const Message* msg, butil::IOBuf& buf) {
+    if (!msg) {
+        return false;
+    }
+    Message* msg_ptr = const_cast<Message*>(msg);
+    return msg_ptr->append_msg_to_iobuf(buf);
+}
+

Review Comment:
   Const-correctness violation: SerializeFbToIOBUF takes a const Message* but 
then casts away the const to call append_msg_to_iobuf. This breaks 
const-correctness. If append_msg_to_iobuf modifies the Message (which it 
appears to do by calling _iobuf.append_to), then SerializeFbToIOBUF should take 
a non-const Message* parameter. Alternatively, if append_msg_to_iobuf doesn't 
actually modify the Message, it should be marked const.
   ```suggestion
   bool SerializeFbToIOBUF(Message* msg, butil::IOBuf& buf) {
       if (!msg) {
           return false;
       }
       return msg->append_msg_to_iobuf(buf);
   }
   ```



##########
src/brpc/details/flatbuffers_impl.h:
##########
@@ -0,0 +1,297 @@
+#ifndef BRPC_FLATBUFFERS_IMPL_H_
+#define BRPC_FLATBUFFERS_IMPL_H_
+
+#include <stdint.h>
+#include <stdio.h>
+#include <string>
+#include <flatbuffers/flatbuffers.h>
+#include "butil/iobuf.h"
+#include "butil/single_iobuf.h"
+#include "brpc/details/flatbuffers_common.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+class MessageBuilder;
+
+// Custom allocator for FlatBuffers that uses IOBuf as underlying storage.
+// This allocator manages memory allocation for FlatBuffers messages within
+// brpc's zero-copy buffer system, enabling efficient serialization and
+// deserialization without unnecessary memory copies.
+class SlabAllocator : public ::flatbuffers::Allocator {
+public:
+    SlabAllocator() {}
+
+    SlabAllocator(const SlabAllocator &other) = delete;
+
+    SlabAllocator &operator=(const SlabAllocator &other) = delete;
+
+    SlabAllocator(SlabAllocator &&other) {
+        // default-construct and swap idiom
+        swap(other);
+    }
+
+    SlabAllocator &operator=(SlabAllocator &&other) {
+    // move-construct and swap idiom
+        SlabAllocator temp(std::move(other));
+        swap(temp);
+        return *this;
+    }
+
+    void swap(SlabAllocator &other) {
+        _iobuf.swap(other._iobuf);
+    }
+
+    virtual ~SlabAllocator() {}
+    /*
+    * Allocate memory from the slab allocator.
+    * buffer struct: fb header + fb message
+    */
+    virtual uint8_t *allocate(size_t size);
+
+    virtual void deallocate(uint8_t *p, size_t size) override {
+        if (p == _fb_begin_head) {
+            _iobuf.deallocate((void*)_full_buf_head);
+        }
+    }
+
+    void deallocate(void *p) {
+        _iobuf.deallocate(p);
+    }
+
+    virtual uint8_t *reallocate_downward(uint8_t *old_p, size_t old_size,
+                                        size_t new_size, size_t in_use_back,
+                                        size_t in_use_front);
+protected:
+    void memcpy_downward(uint8_t *old_p, size_t old_size, uint8_t *new_p,
+                        size_t new_size, size_t in_use_back,
+                        size_t in_use_front);
+private:
+
+    butil::SingleIOBuf &get_iobuf() {
+        return _iobuf;
+    }
+    uint8_t *_full_buf_head;
+    uint8_t *_fb_begin_head;
+    size_t _old_size_param;
+    butil::SingleIOBuf _iobuf;
+    friend class MessageBuilder;
+};
+
+// SlabAllocatorMember is a hack to ensure that the MessageBuilder's
+// slab_allocator_ member is constructed before the FlatBufferBuilder, since
+// the allocator is used in the FlatBufferBuilder ctor.
+struct SlabAllocatorMember {
+  SlabAllocator slab_allocator_;
+};
+
+// Represents a FlatBuffers message in brpc's zero-copy buffer system.
+// This class wraps a FlatBuffers message stored in an IOBuf
+// The message is move-only and cannot be copied.
+class Message {
+public:
+    Message() : _meta_size(0), _msg_size(0) {}
+ 
+    Message &operator=(const Message &other) = delete;
+private:
+    Message(const butil::SingleIOBuf &iobuf,
+            uint32_t meta_size,
+            uint32_t msg_size);
+
+    Message(const butil::IOBuf::BlockRef &ref,
+                    uint32_t meta_size,
+                    uint32_t msg_size);
+    friend class MessageBuilder;
+public:
+    Message(Message &&other) = default;
+
+    Message(const Message &other) = delete; 
+
+    Message &operator=(Message &&other) = default;
+
+    void *mutable_data() const {
+        return (void *)const_cast<uint8_t *>(data());
+    }
+
+    const uint8_t *data() const {
+        const uint8_t *buf = (const uint8_t *)_iobuf.get_begin();
+        return buf + _meta_size;
+    }
+
+    void *mutable_buf_begin() const {
+        return const_cast<void*>(_iobuf.get_begin());
+    }
+
+    void *reduce_meta_size_and_get_buf(uint32_t new_size);
+
+    uint32_t get_meta_size() const {
+        return _meta_size;
+    }
+
+    size_t size() const {
+        return _msg_size;
+    }
+
+    template<class T>
+    bool Verify() const {
+        ::flatbuffers::Verifier verifier(data(), size());
+        return verifier.VerifyBuffer<T>(nullptr);
+    }
+
+    template<class T> T *GetMutableRoot() { return 
::flatbuffers::GetMutableRoot<T>(mutable_data()); }
+    template<class T> const T *GetRoot() const { return 
::flatbuffers::GetRoot<T>(data()); }
+
+    bool parse_msg_from_iobuf(const butil::IOBuf& buf, size_t msg_size, size_t 
meta_size);
+
+    bool append_msg_to_iobuf(butil::IOBuf& buf);
+
+    void Clear() {
+        _iobuf.reset(); 
+        _meta_size = 0; 
+        _msg_size = 0;
+    }
+private:
+    butil::SingleIOBuf _iobuf;
+    uint32_t _meta_size;
+    uint32_t _msg_size;
+};
+
+// MessageBuilder is a BRPC-specific FlatBufferBuilder that uses SlabAllocator
+// to allocate BRPC buffers.
+class MessageBuilder : private SlabAllocatorMember,
+                       public ::flatbuffers::FlatBufferBuilder {
+public:
+    explicit MessageBuilder(::flatbuffers::uoffset_t initial_size = 1024)
+        : ::flatbuffers::FlatBufferBuilder(initial_size, &slab_allocator_, 
false) {}
+
+    MessageBuilder(const MessageBuilder &other) = delete;
+
+    MessageBuilder &operator=(const MessageBuilder &other) = delete;
+
+    MessageBuilder(MessageBuilder &&other)
+        : ::flatbuffers::FlatBufferBuilder(1024, &slab_allocator_, false) {
+        // Default construct and swap idiom.
+        Swap(other);
+    }
+
+  /// Create a MessageBuilder from a FlatBufferBuilder.
+    explicit MessageBuilder(::flatbuffers::FlatBufferBuilder &&src,
+                        void (*dealloc)(void *) = NULL)
+      : ::flatbuffers::FlatBufferBuilder(1024, &slab_allocator_, false) {
+        src.Swap(*this);
+        src.SwapBufAllocator(*this);
+        if (buf_.capacity()) {
+            uint8_t *buf = buf_.scratch_data();  // pointer to memory
+            size_t capacity = buf_.capacity();   // size of memory
+            slab_allocator_._iobuf.assign_user_data((void*)buf, capacity, 
dealloc);
+        } else {
+            slab_allocator_._iobuf.reset();
+        }
+    }
+
+  /// Move-assign a FlatBufferBuilder to a MessageBuilder.
+  /// Only FlatBufferBuilder with default allocator (basically, nullptr) is
+  /// supported.
+    MessageBuilder &operator=(::flatbuffers::FlatBufferBuilder &&src) {
+        // Move construct a temporary and swap
+        MessageBuilder temp(std::move(src));
+        Swap(temp);
+        return *this;
+    }
+
+    MessageBuilder &operator=(MessageBuilder &&other) {
+        // Move construct a temporary and swap
+        MessageBuilder temp(std::move(other));
+        Swap(temp);
+        return *this;
+    }
+
+    void Swap(MessageBuilder &other) {
+        slab_allocator_.swap(other.slab_allocator_);
+        ::flatbuffers::FlatBufferBuilder::Swap(other);
+        // After swapping the FlatBufferBuilder, we swap back the allocator, 
which
+        // restores the original allocator back in place. This is necessary 
because
+        // MessageBuilder's allocator is its own member (SlabAllocatorMember). 
The
+        // allocator passed to FlatBufferBuilder::vector_downward must point 
to this
+        // member.
+        buf_.swap_allocator(other.buf_);
+    }
+
+    ~MessageBuilder() {}
+
+    // GetMessage extracts the subslab of the buffer corresponding to the
+    // flatbuffers-encoded region and wraps it in a `Message` to handle buffer
+    // ownership.
+    // Message GetMessage() = delete;
+
+    Message ReleaseMessage();
+};
+
+struct BrpcDescriptorTable {
+    const std::string prefix;
+    const std::string service_name;
+    const std::string method_name_list; // split by blank
+};
+
+class ServiceDescriptor {
+public:
+    ServiceDescriptor() : name_("")
+                        ,full_name_("")
+                        ,methods_(NULL)
+                        ,method_count_(0)
+                        ,index_(0) {}
+    ~ServiceDescriptor();
+    int init(const BrpcDescriptorTable& table);
+    // The name of the service, not including its containing scope.
+    const std::string& name() const {return name_;}
+    // The fully-qualified name of the service, scope delimited by periods.
+    const std::string& full_name() const {return full_name_;}
+    // Index of this service within the file's services array.
+    uint32_t index() const {return index_;}
+    // The number of methods this service defines.
+    int method_count() const {return method_count_;}
+    // Gets a MethodDescriptor by index, where 0 <= index < method_count().
+    // These are returned in the order they were defined in the .proto file.
+    const MethodDescriptor* method(int index) const;
+private:
+    void release_descriptor(); 
+    std::string name_;
+    std::string full_name_;
+    MethodDescriptor** methods_;
+    int method_count_;
+    uint32_t index_;
+    FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS(ServiceDescriptor);
+};
+
+int parse_service_descriptors(const BrpcDescriptorTable& descriptor_table,
+                    ServiceDescriptor** descriptor_out);
+
+class MethodDescriptor {
+public:
+    MethodDescriptor(const char* prefix, 
+                            const char* name, 
+                            const ServiceDescriptor* service, 
+                            int index);
+    // Name of this method, not including containing scope.
+    const std::string& name() const {return name_;}
+    // The fully-qualified name of the method, scope delimited by periods.
+    const std::string& full_name() const {return full_name_;}
+    // Index within the service's Descriptor.
+    int index() const {return index_;}
+    const ServiceDescriptor* service() const { return service_;}
+private:
+    std::string name_;
+    std::string full_name_;
+    const ServiceDescriptor* service_;
+    int index_;
+    FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS(MethodDescriptor);
+};
+
+bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& buf, 
size_t meta_remaind = 0);

Review Comment:
   Spelling error in parameter name: The parameter name 'meta_remaind' should 
be 'meta_remain' or 'meta_remaining' for correct spelling.
   ```suggestion
   bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& 
buf, size_t meta_remaining = 0);
   ```



##########
example/benchmark_fb/client.cpp:
##########
@@ -0,0 +1,130 @@
+#include <gflags/gflags.h>
+#include <bthread/bthread.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include <brpc/channel.h>
+#include <bvar/bvar.h>
+
+#include "test.brpc.fb.h"
+
+
+DEFINE_int32(thread_num, 1, "Number of threads to send requests");
+DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with 
requests");
+DEFINE_int32(request_size, 16, "Bytes of each request");
+DEFINE_string(servers, "0.0.0.0:8002", "IP Address of server");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
+DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
+
+std::string g_request;
+butil::IOBuf g_attachment;
+
+bvar::LatencyRecorder g_latency_recorder("client");
+bvar::LatencyRecorder g_msg_recorder("msg");
+bvar::Adder<int> g_error_count("client_error_count");
+
+static void* sender(void* arg) {
+    test::BenchmarkServiceStub stub(static_cast<brpc::Channel*>(arg));
+    int log_id = 0;
+    while (!brpc::IsAskedToQuit()) {
+        brpc::Controller cntl;
+        brpc::flatbuffers::Message response;
+
+        cntl.set_log_id(log_id++);
+        cntl.request_attachment().append(g_attachment);
+
+        uint64_t msg_begin_ns = butil::cpuwide_time_ns();
+        brpc::flatbuffers::MessageBuilder mb;
+        auto message = mb.CreateString(g_request);
+        auto req = test::CreateBenchmarkRequest(mb, 123, 333, 1111, 2222, 0, 
message);
+        mb.Finish(req);
+        brpc::flatbuffers::Message request = mb.ReleaseMessage();
+
+        uint64_t msg_end_ns = butil::cpuwide_time_ns();
+        stub.Test(&cntl, &request, &response, NULL);
+
+        if (!cntl.Failed()) {
+            g_latency_recorder << cntl.latency_us();
+            g_msg_recorder << (msg_end_ns - msg_begin_ns);
+        } else {
+            g_error_count << 1;
+            CHECK(brpc::IsAskedToQuit())
+                << "error=" << cntl.ErrorText() << " latency=" << 
cntl.latency_us();
+            // We can't connect to the server, sleep a while. Notice that this
+            // is a specific sleeping to prevent this thread from spinning too
+            // fast. You should continue the business logic in a production
+            // server rather than sleeping.
+            bthread_usleep(50000);
+        }
+    }
+    return NULL;
+}
+
+int main(int argc, char* argv[]) {
+    GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
+    // Print parameter information in one line
+    LOG(INFO) << "Parameters - request_size : " << FLAGS_request_size
+              << ", attachment_size: " << FLAGS_attachment_size
+              << ", thread_num: " << FLAGS_thread_num;
+
+    // A Channel represents a communication line to a Server. Notice that
+    // Channel is thread-safe and can be shared by all threads in your program.
+    brpc::Channel channel;
+
+    // Initialize the channel, NULL means using default options.
+    brpc::ChannelOptions options;
+    options.protocol = "fb_rpc";
+    options.connection_type = "";
+    options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100);
+    options.timeout_ms = FLAGS_timeout_ms;
+    options.max_retry = FLAGS_max_retry;
+    if (channel.Init(FLAGS_servers.c_str(), &options) != 0) {
+        LOG(ERROR) << "Fail to initialize channel";
+        return -1;
+    }
+    if (FLAGS_attachment_size > 0) {
+        void* _attachment_addr = malloc(FLAGS_attachment_size);
+        if (!_attachment_addr) {
+            LOG(ERROR) << "Fail to alloc _attachment from system heap";
+            return -1;
+        }
+        g_attachment.append(_attachment_addr, FLAGS_attachment_size);
+        free(_attachment_addr);
+    }
+    if (FLAGS_request_size < 0) {
+        LOG(ERROR) << "Bad request_size=" << FLAGS_request_size;
+        return -1;
+    }
+    g_request.resize(FLAGS_request_size, 'r');
+
+    if (FLAGS_dummy_port >= 0) {
+        brpc::StartDummyServerAt(FLAGS_dummy_port);
+    }
+
+    std::vector<bthread_t> bids;
+    bids.resize(FLAGS_thread_num);
+    for (int i = 0; i < FLAGS_thread_num; ++i) {
+        if (bthread_start_background(&bids[i], NULL, sender, &channel) != 0) {
+            LOG(ERROR) << "Fail to create bthread";
+            return -1;
+        }
+    }
+
+    while (!brpc::IsAskedToQuit()) {
+        sleep(1);
+        LOG(INFO) << "Sending EchoRequest at qps=" << 
(g_latency_recorder.qps(1) / 1000)

Review Comment:
   Inconsistent error message: The error message says "Sending EchoRequest" but 
this is a BenchmarkRequest, not EchoRequest. The message should be updated to 
"Sending BenchmarkRequest" or simply "Sending request" for consistency.
   ```suggestion
           LOG(INFO) << "Sending request at qps=" << (g_latency_recorder.qps(1) 
/ 1000)
   ```



##########
src/brpc/details/flatbuffers_impl.cpp:
##########
@@ -0,0 +1,244 @@
+#include <vector>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <gflags/gflags.h>
+
+#include "butil/iobuf.h"
+#include "butil/logging.h"
+#include "butil/thread_local.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "brpc/details/flatbuffers_impl.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+#define METHOD_SPLIT " "
+#define PREFIX_SPLIT "."
+#define DEFAULT_RESERVE_SIZE 64     // Reserve space for rpc header and meta
+
+uint8_t *SlabAllocator::allocate(size_t size) {
+    _old_size_param = size;
+    size_t real_size = size + DEFAULT_RESERVE_SIZE;
+
+    _full_buf_head = (uint8_t *)_iobuf.allocate(real_size);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    return _fb_begin_head;
+}
+
+uint8_t *SlabAllocator::reallocate_downward(uint8_t *old_p,
+                                        size_t old_size,
+                                        size_t new_size,
+                                        size_t in_use_back,
+                                        size_t in_use_front) {
+    _old_size_param = new_size;
+    size_t new_real_size = new_size + DEFAULT_RESERVE_SIZE;
+
+    void* old_block = _iobuf.get_cur_block();
+    butil::SingleIOBuf::target_block_inc_ref(old_block);
+    _full_buf_head = (uint8_t *)_iobuf.reallocate_downward(new_real_size, 0, 
0);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    memcpy_downward(old_p, old_size, _fb_begin_head, new_size, in_use_back, 
in_use_front);
+    butil::SingleIOBuf::target_block_dec_ref(old_block);
+    return _fb_begin_head;
+}
+
+void SlabAllocator::memcpy_downward(uint8_t *old_p, size_t old_size, uint8_t 
*new_p,
+                    size_t new_size, size_t in_use_back,
+                    size_t in_use_front) {
+    memcpy(new_p + new_size - in_use_back, old_p + old_size - in_use_back,
+        in_use_back);
+    memcpy(new_p, old_p, in_use_front);
+}
+
+int ServiceDescriptor::init(const BrpcDescriptorTable& table) {
+    if (table.service_name == "" || table.prefix == "" ||
+                            table.method_name_list == "") {
+        errno = EINVAL;
+        return -1;
+    }
+    std::vector<std::string> res;
+    std::string strs = table.method_name_list + METHOD_SPLIT;
+    size_t pos = strs.find(METHOD_SPLIT);
+    while(pos != strs.npos) {
+        std::string tmp = strs.substr(0, pos);
+        if (tmp != METHOD_SPLIT && tmp != "\n") {
+            res.push_back(tmp);
+        }
+        strs = strs.substr(pos + 1, strs.size());
+        pos = strs.find(METHOD_SPLIT);
+    }
+    method_count_ = res.size();
+    if (method_count_ <= 0) {
+        errno = EINVAL;
+        return -1;
+    }
+    name_ = table.service_name;
+    full_name_ = table.prefix + std::string(PREFIX_SPLIT) + name_;
+    butil::MurmurHash3_x86_32(full_name_.c_str(), full_name_.size(), 1, 
&index_);
+    methods_ = new MethodDescriptor*[method_count_];
+    if (!methods_) {
+        return -1;
+    }
+    memset(methods_, 0, method_count_ * sizeof(MethodDescriptor*));
+    for (int i = 0; i < method_count_; ++i) {
+        methods_[i] = new MethodDescriptor(table.prefix.c_str(), 
res[i].c_str(), this, i);
+        if (!methods_[i]) {
+            release_descriptor();
+            return -1;
+        }
+    }
+    return 0;
+}
+
+void ServiceDescriptor::release_descriptor() {
+    if (methods_ != NULL ) {
+        for (int i = 0; i < method_count_; ++i) {
+            if (methods_[i] != NULL) {
+                delete methods_[i];
+            }
+            methods_[i] = NULL;
+        }
+        delete methods_;
+        methods_ = NULL;
+    }
+}
+
+ServiceDescriptor::~ServiceDescriptor() {
+    release_descriptor();
+}
+
+const MethodDescriptor* ServiceDescriptor::method(int index) const {
+    if (index < 0 || index >= method_count_) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (!methods_) {
+        errno = EPERM;
+        return NULL;
+    }
+    return methods_[index];
+}
+
+MethodDescriptor::MethodDescriptor(const char* prefix,
+                            const char* name,
+                            const ServiceDescriptor* service,
+                            int index) {
+    name_ = name;
+    full_name_ = std::string(prefix) + std::string(PREFIX_SPLIT) + 
std::string(name);
+    service_ = service;
+    index_ = index;
+}
+
+Message MessageBuilder::ReleaseMessage() {
+    const uint8_t *msg_data = buf_.data();                  // pointer to msg
+    uint32_t msg_size = static_cast<uint32_t>(buf_.size());
+    butil::SingleIOBuf& iobuf = slab_allocator_.get_iobuf();
+    const uint8_t *buf_data = (const uint8_t *) iobuf.get_begin();
+    // Calculate offsets from the buffer start
+    // reserve the memory space for rpc header and meta
+    const uint8_t *data_with_rpc = msg_data - DEFAULT_RESERVE_SIZE;
+    uint32_t new_msg_size = msg_size;
+
+    new_msg_size += DEFAULT_RESERVE_SIZE;
+    uint32_t begin = data_with_rpc - buf_data;
+
+    const butil::IOBuf::BlockRef& ref = iobuf.get_cur_ref();
+    butil::IOBuf::BlockRef sub_ref = {ref.offset + begin, new_msg_size, 
ref.block};
+    Message msg(sub_ref, DEFAULT_RESERVE_SIZE, msg_size);
+    Reset();
+    return msg;
+}
+
+Message::Message(const butil::SingleIOBuf &iobuf,
+            uint32_t meta_size = 0,
+            uint32_t msg_size = 0)
+                    : _iobuf(iobuf)
+                    , _meta_size(meta_size)
+                    , _msg_size(msg_size) {
+    if (msg_size == 0) {
+        _msg_size = _iobuf.get_length() - meta_size;
+    }
+}
+
+Message::Message(const butil::IOBuf::BlockRef &ref,
+            uint32_t meta_size = 0,
+            uint32_t msg_size = 0)

Review Comment:
   Const-correctness issue: The default parameter values for meta_size and 
msg_size should not be specified in the constructor definition when they are 
private constructors. Default parameter values should only be declared in the 
header file (lines 101-103 in flatbuffers_impl.h), not redeclared in the 
implementation. This can cause confusion and potential mismatches.



##########
example/benchmark_fb/test_generated.h:
##########
@@ -0,0 +1,284 @@
+// automatically generated by the FlatBuffers compiler, do not modify
+
+
+#ifndef FLATBUFFERS_GENERATED_TEST_TEST_H_
+#define FLATBUFFERS_GENERATED_TEST_TEST_H_
+
+#include "flatbuffers/flatbuffers.h"
+
+// Ensure the included flatbuffers.h is the same version as when this file was
+// generated, otherwise it may not be compatible.
+static_assert(FLATBUFFERS_VERSION_MAJOR == 25 &&
+              FLATBUFFERS_VERSION_MINOR == 9 &&
+              FLATBUFFERS_VERSION_REVISION == 23,

Review Comment:
   Hardcoded FlatBuffers version: The static_assert on lines 11-14 checks for a 
specific FlatBuffers version (25.9.23). This is overly strict and will cause 
compilation failures if users have a different (but compatible) version of 
FlatBuffers installed. Consider whether this strict version check is necessary, 
or if it should be relaxed to check only major version compatibility. Since 
this is auto-generated code, this may need to be addressed in the code 
generator configuration.
   ```suggestion
   static_assert(FLATBUFFERS_VERSION_MAJOR == 25,
   ```



##########
src/brpc/details/flatbuffers_impl.cpp:
##########
@@ -0,0 +1,244 @@
+#include <vector>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <gflags/gflags.h>
+
+#include "butil/iobuf.h"
+#include "butil/logging.h"
+#include "butil/thread_local.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "brpc/details/flatbuffers_impl.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+#define METHOD_SPLIT " "
+#define PREFIX_SPLIT "."
+#define DEFAULT_RESERVE_SIZE 64     // Reserve space for rpc header and meta
+
+uint8_t *SlabAllocator::allocate(size_t size) {
+    _old_size_param = size;
+    size_t real_size = size + DEFAULT_RESERVE_SIZE;
+
+    _full_buf_head = (uint8_t *)_iobuf.allocate(real_size);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    return _fb_begin_head;

Review Comment:
   Missing null pointer check: After calling _iobuf.allocate(real_size) on line 
24, the return value should be checked for NULL before proceeding. If 
allocation fails, _full_buf_head will be NULL, and subsequent pointer 
arithmetic (_fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE) will result 
in undefined behavior. Add a null check and return NULL if allocation fails.



##########
src/brpc/details/flatbuffers_common.h:
##########
@@ -0,0 +1,100 @@
+#ifndef BRPC_FLATBUFFERS_COMMON_H_
+#define BRPC_FLATBUFFERS_COMMON_H_

Review Comment:
   Missing Apache License header. All source files in brpc should include the 
standard Apache License 2.0 header comment block at the top of the file, as 
seen in other files like http_message.h. This is required for proper licensing 
compliance.



##########
src/brpc/details/flatbuffers_impl.cpp:
##########
@@ -0,0 +1,244 @@
+#include <vector>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <gflags/gflags.h>
+
+#include "butil/iobuf.h"
+#include "butil/logging.h"
+#include "butil/thread_local.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "brpc/details/flatbuffers_impl.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+#define METHOD_SPLIT " "
+#define PREFIX_SPLIT "."
+#define DEFAULT_RESERVE_SIZE 64     // Reserve space for rpc header and meta
+
+uint8_t *SlabAllocator::allocate(size_t size) {
+    _old_size_param = size;
+    size_t real_size = size + DEFAULT_RESERVE_SIZE;
+
+    _full_buf_head = (uint8_t *)_iobuf.allocate(real_size);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    return _fb_begin_head;
+}
+
+uint8_t *SlabAllocator::reallocate_downward(uint8_t *old_p,
+                                        size_t old_size,
+                                        size_t new_size,
+                                        size_t in_use_back,
+                                        size_t in_use_front) {
+    _old_size_param = new_size;
+    size_t new_real_size = new_size + DEFAULT_RESERVE_SIZE;
+
+    void* old_block = _iobuf.get_cur_block();
+    butil::SingleIOBuf::target_block_inc_ref(old_block);
+    _full_buf_head = (uint8_t *)_iobuf.reallocate_downward(new_real_size, 0, 
0);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    memcpy_downward(old_p, old_size, _fb_begin_head, new_size, in_use_back, 
in_use_front);
+    butil::SingleIOBuf::target_block_dec_ref(old_block);
+    return _fb_begin_head;
+}
+
+void SlabAllocator::memcpy_downward(uint8_t *old_p, size_t old_size, uint8_t 
*new_p,
+                    size_t new_size, size_t in_use_back,
+                    size_t in_use_front) {
+    memcpy(new_p + new_size - in_use_back, old_p + old_size - in_use_back,
+        in_use_back);
+    memcpy(new_p, old_p, in_use_front);
+}
+
+int ServiceDescriptor::init(const BrpcDescriptorTable& table) {
+    if (table.service_name == "" || table.prefix == "" ||
+                            table.method_name_list == "") {
+        errno = EINVAL;
+        return -1;
+    }
+    std::vector<std::string> res;
+    std::string strs = table.method_name_list + METHOD_SPLIT;
+    size_t pos = strs.find(METHOD_SPLIT);
+    while(pos != strs.npos) {
+        std::string tmp = strs.substr(0, pos);
+        if (tmp != METHOD_SPLIT && tmp != "\n") {
+            res.push_back(tmp);
+        }
+        strs = strs.substr(pos + 1, strs.size());
+        pos = strs.find(METHOD_SPLIT);
+    }
+    method_count_ = res.size();
+    if (method_count_ <= 0) {
+        errno = EINVAL;
+        return -1;
+    }
+    name_ = table.service_name;
+    full_name_ = table.prefix + std::string(PREFIX_SPLIT) + name_;
+    butil::MurmurHash3_x86_32(full_name_.c_str(), full_name_.size(), 1, 
&index_);
+    methods_ = new MethodDescriptor*[method_count_];
+    if (!methods_) {
+        return -1;
+    }
+    memset(methods_, 0, method_count_ * sizeof(MethodDescriptor*));
+    for (int i = 0; i < method_count_; ++i) {
+        methods_[i] = new MethodDescriptor(table.prefix.c_str(), 
res[i].c_str(), this, i);
+        if (!methods_[i]) {
+            release_descriptor();
+            return -1;
+        }
+    }
+    return 0;
+}
+
+void ServiceDescriptor::release_descriptor() {
+    if (methods_ != NULL ) {
+        for (int i = 0; i < method_count_; ++i) {
+            if (methods_[i] != NULL) {
+                delete methods_[i];
+            }
+            methods_[i] = NULL;
+        }
+        delete methods_;
+        methods_ = NULL;
+    }
+}
+
+ServiceDescriptor::~ServiceDescriptor() {
+    release_descriptor();
+}
+
+const MethodDescriptor* ServiceDescriptor::method(int index) const {
+    if (index < 0 || index >= method_count_) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (!methods_) {
+        errno = EPERM;
+        return NULL;
+    }
+    return methods_[index];
+}
+
+MethodDescriptor::MethodDescriptor(const char* prefix,
+                            const char* name,
+                            const ServiceDescriptor* service,
+                            int index) {
+    name_ = name;
+    full_name_ = std::string(prefix) + std::string(PREFIX_SPLIT) + 
std::string(name);
+    service_ = service;
+    index_ = index;
+}
+
+Message MessageBuilder::ReleaseMessage() {
+    const uint8_t *msg_data = buf_.data();                  // pointer to msg
+    uint32_t msg_size = static_cast<uint32_t>(buf_.size());
+    butil::SingleIOBuf& iobuf = slab_allocator_.get_iobuf();
+    const uint8_t *buf_data = (const uint8_t *) iobuf.get_begin();
+    // Calculate offsets from the buffer start
+    // reserve the memory space for rpc header and meta
+    const uint8_t *data_with_rpc = msg_data - DEFAULT_RESERVE_SIZE;
+    uint32_t new_msg_size = msg_size;
+
+    new_msg_size += DEFAULT_RESERVE_SIZE;
+    uint32_t begin = data_with_rpc - buf_data;
+
+    const butil::IOBuf::BlockRef& ref = iobuf.get_cur_ref();
+    butil::IOBuf::BlockRef sub_ref = {ref.offset + begin, new_msg_size, 
ref.block};
+    Message msg(sub_ref, DEFAULT_RESERVE_SIZE, msg_size);
+    Reset();
+    return msg;
+}
+
+Message::Message(const butil::SingleIOBuf &iobuf,
+            uint32_t meta_size = 0,
+            uint32_t msg_size = 0)
+                    : _iobuf(iobuf)
+                    , _meta_size(meta_size)
+                    , _msg_size(msg_size) {
+    if (msg_size == 0) {
+        _msg_size = _iobuf.get_length() - meta_size;
+    }
+}
+
+Message::Message(const butil::IOBuf::BlockRef &ref,
+            uint32_t meta_size = 0,
+            uint32_t msg_size = 0)
+                    : _iobuf(ref)
+                    , _meta_size(meta_size)
+                    , _msg_size(msg_size) {
+    if (msg_size == 0) {
+        _msg_size = _iobuf.get_length() - meta_size;
+    }
+}
+
+bool Message::parse_msg_from_iobuf(const butil::IOBuf& buf, size_t msg_size, 
size_t meta_size) {
+    size_t buf_size = buf.size();
+    if (buf_size != (msg_size + meta_size)) {
+        return false;
+    }
+    if (!_iobuf.assign(buf, buf_size)) {
+        return false;
+    }
+    _meta_size = meta_size;
+    _msg_size = msg_size;
+
+    return true;
+}
+
+bool Message::append_msg_to_iobuf(butil::IOBuf& buf) {
+    _iobuf.append_to(&buf);
+    return true;
+}
+
+void *Message::reduce_meta_size_and_get_buf(uint32_t new_size) {
+    if (new_size < _meta_size) {
+        uint32_t off = _meta_size - new_size;
+        const butil::IOBuf::BlockRef& ref = _iobuf.get_cur_ref();
+        butil::IOBuf::BlockRef sub_ref = {ref.offset + off, ref.length - off, 
ref.block};
+        _iobuf = butil::SingleIOBuf(sub_ref);
+        _meta_size = new_size;
+        return const_cast<void*>(_iobuf.get_begin());
+    } else if (new_size == _meta_size) {
+        return const_cast<void*>(_iobuf.get_begin());
+    }
+    LOG(WARNING) << "You should change DEFAULT_RESERVE_SIZE, new_size=" << 
new_size << " > _meta_size=" << _meta_size;
+    return NULL;
+}
+
+int parse_service_descriptors(const BrpcDescriptorTable& descriptor_table,
+                    ServiceDescriptor** descriptor_out) {
+    if(descriptor_out == NULL) {
+        errno = EINVAL;
+        return -1;
+    }
+    ServiceDescriptor *out = new ServiceDescriptor;
+    if (!out) {
+        return -1;
+    }
+    int ret = out->init(descriptor_table);
+    if (ret < 0) {
+        delete out;
+        return -1;
+    }
+    *descriptor_out = out;
+    return 0;
+}
+
+bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& buf, 
size_t meta_remaind) {
+    if (!msg || msg_size == 0 || meta_remaind > msg_size) {
+        return false;
+    }
+    return msg->parse_msg_from_iobuf(buf, msg_size, meta_remaind);

Review Comment:
   Spelling error in parameter name: The parameter name 'meta_remaind' should 
be 'meta_remain' or 'meta_remaining' for correct spelling. This typo appears in 
both the declaration (line 290 of flatbuffers_impl.h) and implementation.
   ```suggestion
   bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& 
buf, size_t meta_remaining) {
       if (!msg || msg_size == 0 || meta_remaining > msg_size) {
           return false;
       }
       return msg->parse_msg_from_iobuf(buf, msg_size, meta_remaining);
   ```



##########
src/brpc/details/flatbuffers_impl.h:
##########
@@ -0,0 +1,297 @@
+#ifndef BRPC_FLATBUFFERS_IMPL_H_
+#define BRPC_FLATBUFFERS_IMPL_H_
+
+#include <stdint.h>
+#include <stdio.h>
+#include <string>
+#include <flatbuffers/flatbuffers.h>
+#include "butil/iobuf.h"
+#include "butil/single_iobuf.h"
+#include "brpc/details/flatbuffers_common.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+class MessageBuilder;
+
+// Custom allocator for FlatBuffers that uses IOBuf as underlying storage.
+// This allocator manages memory allocation for FlatBuffers messages within
+// brpc's zero-copy buffer system, enabling efficient serialization and
+// deserialization without unnecessary memory copies.
+class SlabAllocator : public ::flatbuffers::Allocator {
+public:
+    SlabAllocator() {}
+
+    SlabAllocator(const SlabAllocator &other) = delete;
+
+    SlabAllocator &operator=(const SlabAllocator &other) = delete;
+
+    SlabAllocator(SlabAllocator &&other) {
+        // default-construct and swap idiom
+        swap(other);
+    }
+
+    SlabAllocator &operator=(SlabAllocator &&other) {
+    // move-construct and swap idiom
+        SlabAllocator temp(std::move(other));
+        swap(temp);
+        return *this;
+    }
+
+    void swap(SlabAllocator &other) {
+        _iobuf.swap(other._iobuf);
+    }

Review Comment:
   Member variable swap missing: The swap() method only swaps _iobuf, but 
SlabAllocator has other member variables (_full_buf_head, _fb_begin_head, 
_old_size_param) that should also be swapped to maintain consistency. The move 
constructor and move assignment operator rely on swap(), so they will leave 
these members in an inconsistent state. Add std::swap calls for all member 
variables.



##########
src/brpc/details/flatbuffers_impl.cpp:
##########
@@ -0,0 +1,244 @@
+#include <vector>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <gflags/gflags.h>
+
+#include "butil/iobuf.h"
+#include "butil/logging.h"
+#include "butil/thread_local.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "brpc/details/flatbuffers_impl.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+#define METHOD_SPLIT " "
+#define PREFIX_SPLIT "."
+#define DEFAULT_RESERVE_SIZE 64     // Reserve space for rpc header and meta
+
+uint8_t *SlabAllocator::allocate(size_t size) {
+    _old_size_param = size;
+    size_t real_size = size + DEFAULT_RESERVE_SIZE;
+
+    _full_buf_head = (uint8_t *)_iobuf.allocate(real_size);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    return _fb_begin_head;
+}
+
+uint8_t *SlabAllocator::reallocate_downward(uint8_t *old_p,
+                                        size_t old_size,
+                                        size_t new_size,
+                                        size_t in_use_back,
+                                        size_t in_use_front) {
+    _old_size_param = new_size;
+    size_t new_real_size = new_size + DEFAULT_RESERVE_SIZE;
+
+    void* old_block = _iobuf.get_cur_block();
+    butil::SingleIOBuf::target_block_inc_ref(old_block);
+    _full_buf_head = (uint8_t *)_iobuf.reallocate_downward(new_real_size, 0, 
0);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    memcpy_downward(old_p, old_size, _fb_begin_head, new_size, in_use_back, 
in_use_front);
+    butil::SingleIOBuf::target_block_dec_ref(old_block);
+    return _fb_begin_head;
+}
+
+void SlabAllocator::memcpy_downward(uint8_t *old_p, size_t old_size, uint8_t 
*new_p,
+                    size_t new_size, size_t in_use_back,
+                    size_t in_use_front) {
+    memcpy(new_p + new_size - in_use_back, old_p + old_size - in_use_back,
+        in_use_back);
+    memcpy(new_p, old_p, in_use_front);
+}
+
+int ServiceDescriptor::init(const BrpcDescriptorTable& table) {
+    if (table.service_name == "" || table.prefix == "" ||
+                            table.method_name_list == "") {
+        errno = EINVAL;
+        return -1;
+    }
+    std::vector<std::string> res;
+    std::string strs = table.method_name_list + METHOD_SPLIT;
+    size_t pos = strs.find(METHOD_SPLIT);
+    while(pos != strs.npos) {
+        std::string tmp = strs.substr(0, pos);
+        if (tmp != METHOD_SPLIT && tmp != "\n") {
+            res.push_back(tmp);
+        }
+        strs = strs.substr(pos + 1, strs.size());
+        pos = strs.find(METHOD_SPLIT);
+    }
+    method_count_ = res.size();
+    if (method_count_ <= 0) {
+        errno = EINVAL;
+        return -1;
+    }
+    name_ = table.service_name;
+    full_name_ = table.prefix + std::string(PREFIX_SPLIT) + name_;
+    butil::MurmurHash3_x86_32(full_name_.c_str(), full_name_.size(), 1, 
&index_);
+    methods_ = new MethodDescriptor*[method_count_];
+    if (!methods_) {
+        return -1;
+    }
+    memset(methods_, 0, method_count_ * sizeof(MethodDescriptor*));
+    for (int i = 0; i < method_count_; ++i) {
+        methods_[i] = new MethodDescriptor(table.prefix.c_str(), 
res[i].c_str(), this, i);
+        if (!methods_[i]) {
+            release_descriptor();
+            return -1;
+        }
+    }
+    return 0;
+}
+
+void ServiceDescriptor::release_descriptor() {
+    if (methods_ != NULL ) {
+        for (int i = 0; i < method_count_; ++i) {
+            if (methods_[i] != NULL) {
+                delete methods_[i];
+            }
+            methods_[i] = NULL;
+        }
+        delete methods_;

Review Comment:
   Memory leak: methods_ is allocated as an array using 'new 
MethodDescriptor*[method_count_]' on line 79, but it's deallocated with 'delete 
methods_' on line 102 instead of 'delete[] methods_'. This is undefined 
behavior and will cause memory corruption or leaks. Use 'delete[]' for arrays 
allocated with 'new[]'.
   ```suggestion
           delete[] methods_;
   ```



##########
src/brpc/details/flatbuffers_impl.cpp:
##########
@@ -0,0 +1,244 @@
+#include <vector>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <gflags/gflags.h>
+
+#include "butil/iobuf.h"
+#include "butil/logging.h"
+#include "butil/thread_local.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "brpc/details/flatbuffers_impl.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+#define METHOD_SPLIT " "
+#define PREFIX_SPLIT "."
+#define DEFAULT_RESERVE_SIZE 64     // Reserve space for rpc header and meta
+
+uint8_t *SlabAllocator::allocate(size_t size) {
+    _old_size_param = size;
+    size_t real_size = size + DEFAULT_RESERVE_SIZE;
+
+    _full_buf_head = (uint8_t *)_iobuf.allocate(real_size);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    return _fb_begin_head;
+}
+
+uint8_t *SlabAllocator::reallocate_downward(uint8_t *old_p,
+                                        size_t old_size,
+                                        size_t new_size,
+                                        size_t in_use_back,
+                                        size_t in_use_front) {
+    _old_size_param = new_size;
+    size_t new_real_size = new_size + DEFAULT_RESERVE_SIZE;
+
+    void* old_block = _iobuf.get_cur_block();
+    butil::SingleIOBuf::target_block_inc_ref(old_block);
+    _full_buf_head = (uint8_t *)_iobuf.reallocate_downward(new_real_size, 0, 
0);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    memcpy_downward(old_p, old_size, _fb_begin_head, new_size, in_use_back, 
in_use_front);
+    butil::SingleIOBuf::target_block_dec_ref(old_block);
+    return _fb_begin_head;
+}
+
+void SlabAllocator::memcpy_downward(uint8_t *old_p, size_t old_size, uint8_t 
*new_p,
+                    size_t new_size, size_t in_use_back,
+                    size_t in_use_front) {
+    memcpy(new_p + new_size - in_use_back, old_p + old_size - in_use_back,
+        in_use_back);
+    memcpy(new_p, old_p, in_use_front);
+}
+
+int ServiceDescriptor::init(const BrpcDescriptorTable& table) {
+    if (table.service_name == "" || table.prefix == "" ||
+                            table.method_name_list == "") {
+        errno = EINVAL;
+        return -1;
+    }
+    std::vector<std::string> res;
+    std::string strs = table.method_name_list + METHOD_SPLIT;
+    size_t pos = strs.find(METHOD_SPLIT);
+    while(pos != strs.npos) {
+        std::string tmp = strs.substr(0, pos);
+        if (tmp != METHOD_SPLIT && tmp != "\n") {
+            res.push_back(tmp);
+        }
+        strs = strs.substr(pos + 1, strs.size());
+        pos = strs.find(METHOD_SPLIT);
+    }
+    method_count_ = res.size();
+    if (method_count_ <= 0) {
+        errno = EINVAL;
+        return -1;
+    }
+    name_ = table.service_name;
+    full_name_ = table.prefix + std::string(PREFIX_SPLIT) + name_;
+    butil::MurmurHash3_x86_32(full_name_.c_str(), full_name_.size(), 1, 
&index_);
+    methods_ = new MethodDescriptor*[method_count_];
+    if (!methods_) {
+        return -1;
+    }
+    memset(methods_, 0, method_count_ * sizeof(MethodDescriptor*));
+    for (int i = 0; i < method_count_; ++i) {
+        methods_[i] = new MethodDescriptor(table.prefix.c_str(), 
res[i].c_str(), this, i);
+        if (!methods_[i]) {
+            release_descriptor();
+            return -1;
+        }
+    }
+    return 0;
+}
+
+void ServiceDescriptor::release_descriptor() {
+    if (methods_ != NULL ) {
+        for (int i = 0; i < method_count_; ++i) {
+            if (methods_[i] != NULL) {
+                delete methods_[i];
+            }
+            methods_[i] = NULL;
+        }
+        delete methods_;
+        methods_ = NULL;
+    }
+}
+
+ServiceDescriptor::~ServiceDescriptor() {
+    release_descriptor();
+}
+
+const MethodDescriptor* ServiceDescriptor::method(int index) const {
+    if (index < 0 || index >= method_count_) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (!methods_) {
+        errno = EPERM;
+        return NULL;
+    }
+    return methods_[index];
+}
+
+MethodDescriptor::MethodDescriptor(const char* prefix,
+                            const char* name,
+                            const ServiceDescriptor* service,
+                            int index) {
+    name_ = name;
+    full_name_ = std::string(prefix) + std::string(PREFIX_SPLIT) + 
std::string(name);
+    service_ = service;
+    index_ = index;
+}
+
+Message MessageBuilder::ReleaseMessage() {
+    const uint8_t *msg_data = buf_.data();                  // pointer to msg
+    uint32_t msg_size = static_cast<uint32_t>(buf_.size());
+    butil::SingleIOBuf& iobuf = slab_allocator_.get_iobuf();
+    const uint8_t *buf_data = (const uint8_t *) iobuf.get_begin();
+    // Calculate offsets from the buffer start
+    // reserve the memory space for rpc header and meta
+    const uint8_t *data_with_rpc = msg_data - DEFAULT_RESERVE_SIZE;
+    uint32_t new_msg_size = msg_size;
+
+    new_msg_size += DEFAULT_RESERVE_SIZE;
+    uint32_t begin = data_with_rpc - buf_data;
+
+    const butil::IOBuf::BlockRef& ref = iobuf.get_cur_ref();
+    butil::IOBuf::BlockRef sub_ref = {ref.offset + begin, new_msg_size, 
ref.block};
+    Message msg(sub_ref, DEFAULT_RESERVE_SIZE, msg_size);
+    Reset();
+    return msg;
+}
+
+Message::Message(const butil::SingleIOBuf &iobuf,
+            uint32_t meta_size = 0,
+            uint32_t msg_size = 0)

Review Comment:
   Const-correctness issue: The default parameter values for meta_size and 
msg_size should not be specified in the constructor definition when they are 
private constructors. Default parameter values should only be declared in the 
header file (lines 97-103 in flatbuffers_impl.h), not redeclared in the 
implementation. This can cause confusion and potential mismatches.



##########
src/brpc/details/flatbuffers_impl.h:
##########
@@ -0,0 +1,297 @@
+#ifndef BRPC_FLATBUFFERS_IMPL_H_
+#define BRPC_FLATBUFFERS_IMPL_H_
+
+#include <stdint.h>
+#include <stdio.h>
+#include <string>
+#include <flatbuffers/flatbuffers.h>
+#include "butil/iobuf.h"
+#include "butil/single_iobuf.h"
+#include "brpc/details/flatbuffers_common.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+class MessageBuilder;
+
+// Custom allocator for FlatBuffers that uses IOBuf as underlying storage.
+// This allocator manages memory allocation for FlatBuffers messages within
+// brpc's zero-copy buffer system, enabling efficient serialization and
+// deserialization without unnecessary memory copies.
+class SlabAllocator : public ::flatbuffers::Allocator {
+public:
+    SlabAllocator() {}
+
+    SlabAllocator(const SlabAllocator &other) = delete;
+
+    SlabAllocator &operator=(const SlabAllocator &other) = delete;
+
+    SlabAllocator(SlabAllocator &&other) {
+        // default-construct and swap idiom
+        swap(other);
+    }
+
+    SlabAllocator &operator=(SlabAllocator &&other) {
+    // move-construct and swap idiom
+        SlabAllocator temp(std::move(other));
+        swap(temp);
+        return *this;
+    }
+
+    void swap(SlabAllocator &other) {
+        _iobuf.swap(other._iobuf);
+    }
+
+    virtual ~SlabAllocator() {}
+    /*
+    * Allocate memory from the slab allocator.
+    * buffer struct: fb header + fb message
+    */
+    virtual uint8_t *allocate(size_t size);
+
+    virtual void deallocate(uint8_t *p, size_t size) override {
+        if (p == _fb_begin_head) {
+            _iobuf.deallocate((void*)_full_buf_head);
+        }
+    }
+
+    void deallocate(void *p) {
+        _iobuf.deallocate(p);
+    }
+
+    virtual uint8_t *reallocate_downward(uint8_t *old_p, size_t old_size,
+                                        size_t new_size, size_t in_use_back,
+                                        size_t in_use_front);
+protected:
+    void memcpy_downward(uint8_t *old_p, size_t old_size, uint8_t *new_p,
+                        size_t new_size, size_t in_use_back,
+                        size_t in_use_front);
+private:
+
+    butil::SingleIOBuf &get_iobuf() {
+        return _iobuf;
+    }
+    uint8_t *_full_buf_head;
+    uint8_t *_fb_begin_head;
+    size_t _old_size_param;
+    butil::SingleIOBuf _iobuf;
+    friend class MessageBuilder;
+};
+
+// SlabAllocatorMember is a hack to ensure that the MessageBuilder's
+// slab_allocator_ member is constructed before the FlatBufferBuilder, since
+// the allocator is used in the FlatBufferBuilder ctor.
+struct SlabAllocatorMember {
+  SlabAllocator slab_allocator_;
+};
+
+// Represents a FlatBuffers message in brpc's zero-copy buffer system.
+// This class wraps a FlatBuffers message stored in an IOBuf
+// The message is move-only and cannot be copied.
+class Message {
+public:
+    Message() : _meta_size(0), _msg_size(0) {}
+ 
+    Message &operator=(const Message &other) = delete;
+private:
+    Message(const butil::SingleIOBuf &iobuf,
+            uint32_t meta_size,
+            uint32_t msg_size);
+
+    Message(const butil::IOBuf::BlockRef &ref,
+                    uint32_t meta_size,
+                    uint32_t msg_size);
+    friend class MessageBuilder;
+public:
+    Message(Message &&other) = default;
+
+    Message(const Message &other) = delete; 

Review Comment:
   Redundant copy constructor deletion: The Message class has both 
'Message(const Message &other) = delete;' on line 108 and 'Message 
&operator=(const Message &other) = delete;' on line 95. However, line 108 is 
redundant because the copy constructor is already deleted on line 108. The 
order is also unusual - typically copy constructor and copy assignment operator 
are declared together. Consider removing line 95 or organizing the declarations 
more conventionally.



##########
example/benchmark_fb/client.cpp:
##########
@@ -0,0 +1,130 @@
+#include <gflags/gflags.h>
+#include <bthread/bthread.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include <brpc/channel.h>
+#include <bvar/bvar.h>
+
+#include "test.brpc.fb.h"
+
+
+DEFINE_int32(thread_num, 1, "Number of threads to send requests");
+DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with 
requests");
+DEFINE_int32(request_size, 16, "Bytes of each request");
+DEFINE_string(servers, "0.0.0.0:8002", "IP Address of server");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
+DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
+
+std::string g_request;
+butil::IOBuf g_attachment;
+
+bvar::LatencyRecorder g_latency_recorder("client");
+bvar::LatencyRecorder g_msg_recorder("msg");
+bvar::Adder<int> g_error_count("client_error_count");
+
+static void* sender(void* arg) {
+    test::BenchmarkServiceStub stub(static_cast<brpc::Channel*>(arg));
+    int log_id = 0;
+    while (!brpc::IsAskedToQuit()) {
+        brpc::Controller cntl;
+        brpc::flatbuffers::Message response;
+
+        cntl.set_log_id(log_id++);
+        cntl.request_attachment().append(g_attachment);
+
+        uint64_t msg_begin_ns = butil::cpuwide_time_ns();
+        brpc::flatbuffers::MessageBuilder mb;
+        auto message = mb.CreateString(g_request);
+        auto req = test::CreateBenchmarkRequest(mb, 123, 333, 1111, 2222, 0, 
message);
+        mb.Finish(req);
+        brpc::flatbuffers::Message request = mb.ReleaseMessage();
+
+        uint64_t msg_end_ns = butil::cpuwide_time_ns();
+        stub.Test(&cntl, &request, &response, NULL);
+
+        if (!cntl.Failed()) {
+            g_latency_recorder << cntl.latency_us();
+            g_msg_recorder << (msg_end_ns - msg_begin_ns);
+        } else {
+            g_error_count << 1;
+            CHECK(brpc::IsAskedToQuit())
+                << "error=" << cntl.ErrorText() << " latency=" << 
cntl.latency_us();
+            // We can't connect to the server, sleep a while. Notice that this
+            // is a specific sleeping to prevent this thread from spinning too
+            // fast. You should continue the business logic in a production
+            // server rather than sleeping.
+            bthread_usleep(50000);
+        }
+    }
+    return NULL;
+}
+
+int main(int argc, char* argv[]) {
+    GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
+    // Print parameter information in one line
+    LOG(INFO) << "Parameters - request_size : " << FLAGS_request_size
+              << ", attachment_size: " << FLAGS_attachment_size
+              << ", thread_num: " << FLAGS_thread_num;
+
+    // A Channel represents a communication line to a Server. Notice that
+    // Channel is thread-safe and can be shared by all threads in your program.
+    brpc::Channel channel;
+
+    // Initialize the channel, NULL means using default options.
+    brpc::ChannelOptions options;
+    options.protocol = "fb_rpc";
+    options.connection_type = "";
+    options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100);
+    options.timeout_ms = FLAGS_timeout_ms;
+    options.max_retry = FLAGS_max_retry;
+    if (channel.Init(FLAGS_servers.c_str(), &options) != 0) {
+        LOG(ERROR) << "Fail to initialize channel";
+        return -1;
+    }
+    if (FLAGS_attachment_size > 0) {
+        void* _attachment_addr = malloc(FLAGS_attachment_size);
+        if (!_attachment_addr) {
+            LOG(ERROR) << "Fail to alloc _attachment from system heap";
+            return -1;
+        }
+        g_attachment.append(_attachment_addr, FLAGS_attachment_size);
+        free(_attachment_addr);
+    }
+    if (FLAGS_request_size < 0) {
+        LOG(ERROR) << "Bad request_size=" << FLAGS_request_size;
+        return -1;
+    }
+    g_request.resize(FLAGS_request_size, 'r');
+
+    if (FLAGS_dummy_port >= 0) {
+        brpc::StartDummyServerAt(FLAGS_dummy_port);
+    }
+
+    std::vector<bthread_t> bids;
+    bids.resize(FLAGS_thread_num);
+    for (int i = 0; i < FLAGS_thread_num; ++i) {
+        if (bthread_start_background(&bids[i], NULL, sender, &channel) != 0) {
+            LOG(ERROR) << "Fail to create bthread";
+            return -1;
+        }
+    }
+
+    while (!brpc::IsAskedToQuit()) {
+        sleep(1);
+        LOG(INFO) << "Sending EchoRequest at qps=" << 
(g_latency_recorder.qps(1) / 1000)
+                  << "k latency=" << g_latency_recorder.latency(1) << "us"
+                  << " msg latency=" << g_msg_recorder.latency(1) << "ns";
+    }
+
+    LOG(INFO) << "EchoClient is going to quit";

Review Comment:
   Inconsistent error message: The error message says "EchoClient is going to 
quit" but this is a benchmark client, not EchoClient. The message should be 
updated to "BenchmarkClient is going to quit" or simply "Client is going to 
quit" for consistency.
   ```suggestion
       LOG(INFO) << "Client is going to quit";
   ```



##########
src/brpc/details/flatbuffers_impl.cpp:
##########
@@ -0,0 +1,244 @@
+#include <vector>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <gflags/gflags.h>
+
+#include "butil/iobuf.h"
+#include "butil/logging.h"
+#include "butil/thread_local.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "brpc/details/flatbuffers_impl.h"
+
+namespace brpc {
+namespace flatbuffers {
+
+#define METHOD_SPLIT " "
+#define PREFIX_SPLIT "."
+#define DEFAULT_RESERVE_SIZE 64     // Reserve space for rpc header and meta
+
+uint8_t *SlabAllocator::allocate(size_t size) {
+    _old_size_param = size;
+    size_t real_size = size + DEFAULT_RESERVE_SIZE;
+
+    _full_buf_head = (uint8_t *)_iobuf.allocate(real_size);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    return _fb_begin_head;
+}
+
+uint8_t *SlabAllocator::reallocate_downward(uint8_t *old_p,
+                                        size_t old_size,
+                                        size_t new_size,
+                                        size_t in_use_back,
+                                        size_t in_use_front) {
+    _old_size_param = new_size;
+    size_t new_real_size = new_size + DEFAULT_RESERVE_SIZE;
+
+    void* old_block = _iobuf.get_cur_block();
+    butil::SingleIOBuf::target_block_inc_ref(old_block);
+    _full_buf_head = (uint8_t *)_iobuf.reallocate_downward(new_real_size, 0, 
0);
+    _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
+    memcpy_downward(old_p, old_size, _fb_begin_head, new_size, in_use_back, 
in_use_front);
+    butil::SingleIOBuf::target_block_dec_ref(old_block);
+    return _fb_begin_head;

Review Comment:
   Missing null pointer check: After calling 
_iobuf.reallocate_downward(new_real_size, 0, 0) on line 39, the return value 
should be checked for NULL before proceeding. If reallocation fails, 
_full_buf_head will be NULL, and subsequent operations including pointer 
arithmetic and memcpy_downward will result in undefined behavior. Add a null 
check and handle the error case appropriately, ensuring old_block reference is 
properly cleaned up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to