Copilot commented on code in PR #3328:
URL: https://github.com/apache/brpc/pull/3328#discussion_r3413201976
##########
CMakeLists.txt:
##########
@@ -329,6 +370,11 @@ if(WITH_RDMA)
list(APPEND DYNAMIC_LIB ${RDMA_LIB})
endif()
+if(WITH_IOURING)
+ list(APPEND DYNAMIC_LIB ${IOURING_LIB})
+ set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -luring")
+endif()
+
set(BRPC_PRIVATE_LIBS "-lgflags -lprotobuf -lleveldb -lprotoc -lssl -lcrypto
-ldl -lz")
Review Comment:
`BRPC_PRIVATE_LIBS` is appended with `-luring` under `WITH_IOURING`, but it
is immediately overwritten by the subsequent unconditional
`set(BRPC_PRIVATE_LIBS "...")`. As a result, generated link flags (e.g. for
pkg-config / exported variables) will miss `-luring` even when io_uring is
enabled.
##########
src/brpc/iouring_transport.cpp:
##########
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_IOURING
+
+#include <gflags/gflags.h>
+#include "butil/logging.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/iouring/iouring_helper.h"
+#include "brpc/iouring/iouring_endpoint.h"
+#include "brpc/iouring_transport.h"
+
+namespace brpc {
+
+DECLARE_bool(usercode_in_coroutine);
+DECLARE_bool(usercode_in_pthread);
+
+extern SocketVarsCollector* g_vars;
+
+// -----------------------------------------------------------------------
+// IouringTransport::Init
+// -----------------------------------------------------------------------
+
+void IouringTransport::Init(Socket* socket, const SocketOptions& options) {
+ _socket = socket;
+ _default_connect = options.app_connect;
+
+ // Tentatively adopt the caller's edge-trigger callback (or the default
+ // OnNewMessages). This may be cleared below if io_uring polling takes
+ // over the read path.
+ _on_edge_trigger = options.on_edge_triggered_events;
+ if (options.need_on_edge_trigger && _on_edge_trigger == nullptr) {
+ _on_edge_trigger = InputMessenger::OnNewMessages;
+ }
+
+ // Create the endpoint
+ _iouring_ep = new iouring::IouringEndpoint(socket);
+
+ // Register this socket with the Poller. AllocateResources enqueues an
+ // ADD SidOp; the Poller thread picks it up, acquires a read slot (when
+ // --iouring_register_buffers=true) and issues the first SubmitRead.
+ if (iouring::IsIouringAvailable()) {
+ if (_iouring_ep->AllocateResources() < 0) {
+ LOG(WARNING) << "Fail to allocate io_uring resources for "
+ << socket->description() << ", falling back to TCP";
+ delete _iouring_ep;
+ _iouring_ep = nullptr;
+ } else {
+ // io_uring owns the read path regardless of polling mode.
+ // The Poller thread reaps READ / READ_FIXED CQEs and appends data
+ // to socket->_read_buf, then calls ProcessNewMessage directly
+ // (see PollCq).
+ //
+ // We must NOT register an epoll edge-trigger callback here,
+ // because OnNewMessages (the default callback) would call
+ // DoRead() = read(fd), racing with io_uring's reads and stealing
+ // data from the ring – causing partial reads, stalled connections
+ // and protocol parse failures.
+ //
+ // Clearing _on_edge_trigger makes HasOnEdgeTrigger() return
+ // false, so socket.cpp skips AddConsumer(fd) and this fd is
+ // never added to epoll for read events.
+ _on_edge_trigger = nullptr;
+ }
+ }
+
+ // Create the TCP fallback transport (always available).
+ // When _iouring_ep is null (AllocateResources failed) _on_edge_trigger
+ // is still set, so the TCP path works normally via epoll + OnNewMessages.
+ _tcp_transport = std::make_shared<TcpTransport>();
+ _tcp_transport->Init(socket, options);
+}
+
+// -----------------------------------------------------------------------
+// IouringTransport::Release
+// -----------------------------------------------------------------------
+
+void IouringTransport::Release() {
+ if (_iouring_ep) {
+ delete _iouring_ep;
+ _iouring_ep = nullptr;
+ }
+}
+
+// -----------------------------------------------------------------------
+// IouringTransport::Reset
+// -----------------------------------------------------------------------
+
+int IouringTransport::Reset(int32_t /*expected_nref*/) {
+ if (_iouring_ep) {
+ _iouring_ep->Reset();
+ }
+ return 0;
+}
+
+// -----------------------------------------------------------------------
+// IouringTransport::Connect
+// -----------------------------------------------------------------------
+
+std::shared_ptr<AppConnect> IouringTransport::Connect() {
+ return _default_connect;
+}
+
+// -----------------------------------------------------------------------
+// IouringTransport::CutFromIOBuf (single-buffer write)
+// -----------------------------------------------------------------------
+
+int IouringTransport::CutFromIOBuf(butil::IOBuf* buf) {
+ // If io_uring is available and has capacity, use it.
+ if (_iouring_ep && iouring::IsIouringAvailable() &&
+ _iouring_ep->IsWritable()) {
+ butil::IOBuf* bufs[1] = {buf};
+ ssize_t nw = _iouring_ep->CutFromIOBufList(bufs, 1);
+ if (nw >= 0) {
+ return 0;
+ }
+ if (errno != EAGAIN) {
+ return -1;
+ }
+ // Fall through to TCP fallback on EAGAIN
+ }
+ // Fallback: synchronous write via the fd
+ return buf->cut_into_file_descriptor(_socket->fd());
+}
+
+// -----------------------------------------------------------------------
+// IouringTransport::CutFromIOBufList (multi-buffer write)
+// -----------------------------------------------------------------------
+
+ssize_t IouringTransport::CutFromIOBufList(butil::IOBuf** buf, size_t ndata) {
+ if (_iouring_ep && iouring::IsIouringAvailable() &&
+ _iouring_ep->IsWritable()) {
+ ssize_t nw = _iouring_ep->CutFromIOBufList(buf, ndata);
+ if (nw >= 0 || errno != EAGAIN) {
+ return nw;
+ }
+ // EAGAIN: fall through to synchronous path
+ }
+ return butil::IOBuf::cut_multiple_into_file_descriptor(
+ _socket->fd(), buf, ndata);
+}
+
+// -----------------------------------------------------------------------
+// IouringTransport::WaitEpollOut
+// -----------------------------------------------------------------------
+//
+// When io_uring polling is active the write path is fully async: SQEs are
+// submitted to the ring and completed by the Poller thread. There is no
+// need to block on epoll(EPOLLOUT) – instead we wait on _epollout_butex,
+// which is incremented by Socket::WakeAsEpollOut() each time a write CQE
+// is reaped in IouringEndpoint::PollCq. This keeps the wait/wake path
+// entirely within io_uring / bthread primitives and avoids the extra epoll
+// file descriptor overhead.
+//
+// If io_uring is unavailable (e.g. AllocateResources failed) we fall back
+// to the traditional epoll(EPOLLOUT) path via Socket::WaitEpollOut().
+// -----------------------------------------------------------------------
+
+int IouringTransport::WaitEpollOut(butil::atomic<int>* epollout_butex,
+ bool pollin, timespec duetime) {
+ g_vars->nwaitepollout << 1;
+
+ if (_iouring_ep && iouring::IsIouringAvailable()) {
+ // io_uring path: wait for a write completion to wake us.
+ // PollCq calls dep->_socket->WakeAsEpollOut() on every WRITE/
+ // WRITE_FIXED CQE, which does:
+ // _epollout_butex->fetch_add(1) + butex_wake_except(...)
+ // So we just need to butex_wait here.
+ const int expected =
+ epollout_butex->load(butil::memory_order_acquire);
+ if (bthread::butex_wait(epollout_butex, expected, &duetime) < 0) {
+ if (errno != EAGAIN && errno != ETIMEDOUT) {
+ const int saved_errno = errno;
+ PLOG(WARNING) << "butex_wait failed for " << _socket;
+ _socket->SetFailed(saved_errno,
+ "butex_wait failed for %s: %s",
+ _socket->description().c_str(),
+ berror(saved_errno));
+ return 1;
+ }
+ }
+ return 0;
+ }
+
+ // Fallback: io_uring is unavailable for this socket (AllocateResources
+ // failed at Init time, so _iouring_ep is null and writes go through the
+ // synchronous fd path which can return EAGAIN). Fall back to epoll so
+ // that KeepWrite does not busy-spin when the send buffer is full.
+ const int rc = _socket->WaitEpollOut(_socket->fd(), pollin, &duetime);
+ if (rc < 0 && errno != ETIMEDOUT) {
+ const int saved_errno = errno;
+ PLOG(WARNING) << "Fail to wait epollout of " << _socket;
+ _socket->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
+ _socket->description().c_str(),
berror(saved_errno));
+ return 1;
+ }
+ return 0;
+}
+
+// -----------------------------------------------------------------------
+// IouringTransport::ProcessEvent
+// -----------------------------------------------------------------------
+
+void IouringTransport::ProcessEvent(bthread_attr_t attr) {
+ bthread_t tid;
+ if (FLAGS_usercode_in_coroutine) {
+ OnEdge(_socket);
+ } else if (bthread_start_background(&tid, &attr, OnEdge, _socket) != 0) {
+ LOG(FATAL) << "Fail to start ProcessEvent bthread";
+ OnEdge(_socket);
+ }
+}
+
+// -----------------------------------------------------------------------
+// IouringTransport::QueueMessage
+// -----------------------------------------------------------------------
+
+void IouringTransport::QueueMessage(InputMessageClosure& input_msg,
+ int* num_bthread_created,
+ bool /*last_msg*/) {
+ InputMessageBase* to_run_msg = input_msg.release();
+ if (!to_run_msg) {
+ return;
+ }
+ bthread_t th;
+ bthread_attr_t tmp =
+ (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD :
BTHREAD_ATTR_NORMAL) |
+ BTHREAD_NOSIGNAL;
+ tmp.keytable_pool = _socket->keytable_pool();
+ tmp.tag = bthread_self_tag();
+ bthread_attr_set_name(&tmp, "ProcessInputMessage");
+ if (!FLAGS_usercode_in_coroutine &&
+ bthread_start_background(&th, &tmp, ProcessInputMessage, to_run_msg)
== 0) {
+ ++*num_bthread_created;
+ } else {
+ ProcessInputMessage(to_run_msg);
+ }
+}
+
+// -----------------------------------------------------------------------
+// IouringTransport::Debug
+// -----------------------------------------------------------------------
+
+void IouringTransport::Debug(std::ostream& os) {
+ if (_iouring_ep) {
+ _iouring_ep->DebugInfo(os);
+ }
+}
+
+// -----------------------------------------------------------------------
+// IouringTransport::ContextInitOrDie (called once at server/channel start)
+// -----------------------------------------------------------------------
+
+int IouringTransport::ContextInitOrDie(bool /*serverOrNot*/,
+ const void* /*_options*/) {
+ iouring::GlobalIouringInitializeOrDie();
+ return 0;
+}
Review Comment:
`IouringTransport::ContextInitOrDie()` only runs
`GlobalIouringInitializeOrDie()` but never starts the per-tag poller ring(s).
Unlike RDMA (which calls `InitPollingModeWithTag()` from `ContextInitOrDie()`),
this makes `SOCKET_MODE_IOURING` unusable unless the application manually calls
`InitPollingModeWithTag()` beforehand; meanwhile `Init()` may still disable
epoll reads, causing stuck connections. Consider mirroring `RdmaTransport` and
initializing polling mode here based on server/channel tag.
##########
src/brpc/iouring/iouring_endpoint.cpp:
##########
@@ -0,0 +1,1016 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_IOURING
+
+#include <errno.h>
+#include <liburing.h>
+#include <sys/uio.h>
+#include <unordered_set>
+#include <string.h>
+
+#include <gflags/gflags.h>
+#include "butil/atomicops.h"
+#include "butil/fd_utility.h"
+#include "butil/logging.h"
+#include "butil/macros.h"
+#include "butil/object_pool.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "bthread/bthread.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/iouring/iouring_block_pool.h"
+#include "brpc/iouring/iouring_helper.h"
+#include "brpc/iouring/iouring_endpoint.h"
+
+DECLARE_int32(task_group_ntags);
+
+namespace brpc {
+namespace iouring {
+
+// ---------------------------------------------------------------------------
+// gflags (endpoint-level tunables)
+// ---------------------------------------------------------------------------
+
+// Each bthread_tag has exactly one Poller bthread (and one io_uring ring).
+// io_uring's SQ is single-producer; bthread work-stealing means multiple
+// Pollers per tag could run on different pthreads and race on the SQ.
+// Horizontal scaling is achieved by increasing --task_group_ntags instead.
+
+DEFINE_bool(iouring_poller_yield, false,
+ "Yield (bthread_yield / sched_yield) after each poll iteration. "
+ "Reduces CPU usage at the cost of higher tail latency.");
+
+DEFINE_int32(iouring_max_cqe_poll_once, 32,
+ "Maximum CQEs reaped per io_uring_peek_batch_cqe() call.");
+
+static const int32_t MAX_INFLIGHT_WRITES = 64;
+
+// ---------------------------------------------------------------------------
+// Constructor / Destructor
+// ---------------------------------------------------------------------------
+
+IouringEndpoint::IouringEndpoint(Socket* s)
+ : _socket(s)
+ , _inflight_writes(0)
+{
+ _read_slot = {};
+}
+
+IouringEndpoint::~IouringEndpoint() {
+ Reset();
+}
+
+void IouringEndpoint::Reset() {
+ DeallocateResources();
+ _inflight_writes.store(0, butil::memory_order_relaxed);
+}
+
+// ---------------------------------------------------------------------------
+// Resource management
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::AllocateResources() {
+ // Register this socket with the Poller via the MPSC op_queue.
+ // The Poller thread will dequeue the ADD message on its next iteration
+ // and issue the first SubmitRead there – on the Poller thread, without
+ // any locking.
+ //
+ // Registered-buffer mode: acquire a fixed read slot on the Poller thread
+ // when processing the ADD message (see main loop). Nothing to do here.
+ PollerAddSid();
+ return 0;
+}
+
+void IouringEndpoint::DeallocateResources() {
+ // Embed the current read slot (if any) in the REMOVE message so the
+ // Poller thread can call IouringMemPool::Deallocate(buf) for any
+ // in-flight READ_FIXED buffer that never got a CQE.
+ PollerRemoveSid(_read_slot);
+ _read_slot = {};
+}
+
+// ---------------------------------------------------------------------------
+// Ring / Poller access
+// ---------------------------------------------------------------------------
+
+IouringEndpoint::Poller* IouringEndpoint::GetPoller() const {
+ bthread_tag_t tag = bthread_self_tag();
+ if (tag < 0 || tag >= static_cast<int>(_poller_groups.size())) { tag = 0; }
+ auto& pollers = _poller_groups[tag].pollers;
+ const size_t index = butil::fmix32(_socket->id()) % pollers.size();
+ if (!pollers[index].ring_initialized) { return nullptr; }
+ return &pollers[index];
+}
+
+// ---------------------------------------------------------------------------
+// IouringPollerHandle – implementation
+//
+// ring() and Submit() are implemented here (not in the header) because they
+// need access to IouringEndpoint::Poller and IouringEndpoint::_poller_groups,
+// which are private and not yet fully defined at the point where
+// IouringPollerHandle is declared in iouring_helper.h.
+// ---------------------------------------------------------------------------
+
+int IouringPollerHandle::Submit(
+ std::function<int(::io_uring*)> prepare_fn) const {
+ if (tag_ < 0 || tag_ >= static_cast<int>(
+ IouringEndpoint::_poller_groups.size())) {
+ errno = ENODEV;
+ return -1;
+ }
+ auto& pollers = IouringEndpoint::_poller_groups[tag_].pollers;
+ if (index_ < 0 || index_ >= static_cast<int>(pollers.size())) {
+ errno = ENODEV;
+ return -1;
+ }
+ IouringEndpoint::Poller& poller = pollers[index_];
+ if (!poller.ring_initialized) {
+ errno = ENODEV;
+ return -1;
+ }
+ // Called on the Poller thread (passive path only); no lock needed.
+ int n = prepare_fn(&poller.ring);
+ if (n < 0) { errno = EBUSY; return -1; }
+ if (n == 0) { return 0; }
+ int ret = io_uring_submit(&poller.ring);
+ if (ret < 0) { errno = -ret; return -1; }
+ return ret;
+}
+
+// ---------------------------------------------------------------------------
+// SubmitOneSqe
+//
+// Must be called on the Poller thread; no locking.
+// Gets one SQE, calls |prepare_fn(sqe)|, issues io_uring_submit().
+// Returns io_uring_submit() result (>= 0) or -1 (errno set).
+// errno=ENOBUFS → SQ full
+// errno=ENODEV → ring not yet initialised
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::SubmitOneSqe(
+ std::function<void(struct io_uring_sqe*)> prepare_fn) {
+ Poller* poller = GetPoller();
+ if (!poller) { errno = ENODEV; return -1; }
+
+ struct io_uring_sqe* sqe = io_uring_get_sqe(&poller->ring);
+ if (!sqe) { errno = ENOBUFS; return -1; }
+ prepare_fn(sqe);
+ int ret = io_uring_submit(&poller->ring);
+ if (ret < 0) { errno = -ret; return -1; }
+ return ret;
+}
+
+// ---------------------------------------------------------------------------
+// Ring parameters factory
+// ---------------------------------------------------------------------------
+
+struct io_uring_params IouringEndpoint::BuildRingParams() {
+ struct io_uring_params p;
+ memset(&p, 0, sizeof(p));
+
+ const IouringPollingMode mode = GetPollingMode();
+
+ switch (mode) {
+ case IouringPollingMode::SQPOLL:
+ case IouringPollingMode::HYBRID:
+ p.flags |= IORING_SETUP_SQPOLL;
+ if (FLAGS_iouring_sqpoll_idle_ms > 0) {
+ p.sq_thread_idle =
static_cast<unsigned>(FLAGS_iouring_sqpoll_idle_ms);
+ }
+ if (FLAGS_iouring_sqpoll_cpu >= 0) {
+ p.flags |= IORING_SETUP_SQ_AFF;
+ p.sq_thread_cpu = static_cast<unsigned>(FLAGS_iouring_sqpoll_cpu);
+ }
+ break;
+ case IouringPollingMode::IOPOLL:
+ p.flags |= IORING_SETUP_IOPOLL;
+ break;
+ case IouringPollingMode::NONE:
+ default:
+ break;
+ }
+
+ if (FLAGS_iouring_cq_size > 0) {
+ p.flags |= IORING_SETUP_CQSIZE;
+ p.cq_entries = static_cast<unsigned>(FLAGS_iouring_cq_size);
+ }
+
+ return p;
+}
+
+// ---------------------------------------------------------------------------
+// SubmitRead
+//
+// Registered mode (--iouring_register_buffers=true):
+// Allocates one block from IouringMemPool (already pre-registered with the
+// ring), stores it in _read_slot, and submits IORING_OP_READ_FIXED.
+// On completion PollCq wraps the block zero-copy in IOBuf; the IOBuf
+// deleter calls IouringMemPool::Deallocate(buf) (thread-safe).
+//
+// Unregistered mode (--iouring_register_buffers=false):
+// IORING_OP_READ into a per-call malloc bounce buffer (64 KiB).
+// The buffer is owned by IOBuf after PollCq and free()'d when consumed.
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::SubmitRead(int fd) {
+ IouringReqContext* ctx = butil::get_object<IouringReqContext>();
+ if (!ctx) { errno = ENOMEM; return -1; }
+ ctx->op = IOURING_OP_READ; // will be overwritten below
+ ctx->fd = fd;
+ ctx->socket_id = _socket->id();
+ ctx->bounce = nullptr;
+
+ if (IsFixedBuffersEnabled()) {
+ // Registered path – allocate a fresh block from IouringMemPool.
+ IouringMemPool& mp = IouringMemPool::Instance();
+ const size_t blksz = mp.block_size();
+ void* buf = mp.Allocate(blksz);
+ if (!buf) {
+ butil::return_object(ctx);
+ errno = ENOMEM;
+ return -1;
+ }
+
+ // Look up the pre-registered buf_index for this block.
+ Poller* poller = GetPoller();
+ if (!poller) {
+ mp.Deallocate(buf);
+ butil::return_object(ctx);
+ errno = ENODEV;
+ return -1;
+ }
+ const int buf_index = mp.GetBufIndex(&poller->ring, buf);
+ if (buf_index < 0) {
+ // Should never happen: the block was just allocated from a
+ // registered region. Log and fail rather than silently.
+ LOG(ERROR) << "io_uring SubmitRead: GetBufIndex returned -1 "
+ "for a freshly allocated block; this is a bug.";
+ mp.Deallocate(buf);
+ butil::return_object(ctx);
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Record the slot so PollCq (and DeallocateResources) can find it.
+ _read_slot.buf = buf;
+ _read_slot.buf_index = buf_index;
+ _read_slot.size = blksz;
+
+ ctx->op = IOURING_OP_READ_FIXED;
+ int ret = SubmitOneSqe([&](struct io_uring_sqe* sqe) {
+ io_uring_prep_read_fixed(sqe, fd,
+ buf,
+ static_cast<unsigned>(blksz),
+ /*offset=*/0,
+ buf_index);
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ });
+ if (ret < 0) {
+ // Submission failed; release the block immediately.
+ mp.Deallocate(buf);
+ _read_slot = {};
+ butil::return_object(ctx);
+ return -1;
+ }
+ return 0;
+ }
+
+ // Unregistered path – allocate a temporary bounce buffer.
+ constexpr size_t kBounceSize = 65536;
+ void* bounce = malloc(kBounceSize);
+ if (!bounce) { butil::return_object(ctx); errno = ENOMEM; return -1; }
+ ctx->op = IOURING_OP_READ;
+
+ int ret = SubmitOneSqe([&](struct io_uring_sqe* sqe) {
+ io_uring_prep_read(sqe, fd, bounce, static_cast<unsigned>(kBounceSize),
+ /*offset=*/0);
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ });
+ if (ret < 0) { free(bounce); butil::return_object(ctx); return -1; }
+ ctx->bounce = bounce; // PollCq takes ownership and wraps it in IOBuf
+ return 0;
+}
+
+
+ssize_t IouringEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
+ CHECK(from != nullptr);
+ CHECK(ndata > 0);
+
+ if (!IsWritable()) { errno = EAGAIN; return -1; }
+
+ const int fd = _socket->fd();
+
+ // ------------------------------------------------------------------
+ // WRITE_FIXED path (--iouring_register_buffers=true)
+ // ------------------------------------------------------------------
+ // Every IOBuf block comes from IouringMemPool (a pre-registered slab).
+ // GetBufIndex() must always succeed; if it returns -1 a block somehow
+ // escaped the pool – LOG(ERROR) and fail rather than silently degrading.
+ //
+ // One IORING_OP_WRITE_FIXED SQE is submitted per block so the kernel can
+ // DMA directly from pinned pages with no per-op get_user_pages overhead.
+ // Called on the Poller thread; no locking needed.
+ // ------------------------------------------------------------------
+ if (IsFixedBuffersEnabled()) {
+ Poller* poller = GetPoller();
+ if (!poller) { errno = ENODEV; return -1; }
+ IouringMemPool& mp = IouringMemPool::Instance();
+ struct io_uring* ring = &poller->ring;
+
+ ssize_t total_bytes = 0;
+ int sqes_queued = 0;
+
+ for (size_t i = 0; i < ndata; ++i) {
+ if (!from[i] || from[i]->empty()) { continue; }
+ butil::IOBuf& buf = *from[i];
+ while (!buf.empty()) {
+ const void* seg_ptr = buf.fetch1();
+ size_t seg_len = buf.backing_block(0).size();
+ if (seg_len == 0) { break; }
+
+ int buf_idx = mp.GetBufIndex(ring, seg_ptr);
+ if (buf_idx < 0) {
+ // Every block must be registered when
+ // --iouring_register_buffers=true. A -1 here means a
+ // block escaped the pool – this is a programming error.
+ LOG(ERROR) << "io_uring: unregistered IOBuf block ptr="
+ << seg_ptr << " fd=" << fd
+ << "; submission aborted.";
+ errno = EINVAL;
+ break;
+ }
+
+ struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
+ if (!sqe) { errno = ENOBUFS; break; }
+
+ io_uring_prep_write_fixed(sqe, fd,
+ const_cast<void*>(seg_ptr),
+ static_cast<unsigned>(seg_len),
+ 0, buf_idx);
+ IouringReqContext* ctx =
butil::get_object<IouringReqContext>();
+ if (!ctx) { errno = ENOMEM; break; }
+ ctx->op = IOURING_OP_WRITE_FIXED;
+ ctx->fd = fd;
+ ctx->socket_id = _socket->id();
+ ctx->bounce = nullptr;
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ total_bytes += static_cast<ssize_t>(seg_len);
+ ++sqes_queued;
+ buf.pop_front(seg_len);
+ }
+ }
+
+ if (sqes_queued > 0) {
+ int ret = io_uring_submit(ring);
+ if (ret < 0) {
+ errno = -ret;
+ return total_bytes > 0 ? total_bytes : -1;
+ }
+ _inflight_writes.fetch_add(sqes_queued,
butil::memory_order_relaxed);
+ }
+ return total_bytes > 0 ? total_bytes : (sqes_queued == 0 ? 0 : -1);
+ }
+
+ // ------------------------------------------------------------------
+ // Plain WRITEV path (--iouring_register_buffers=false)
+ // ------------------------------------------------------------------
+ // Collect all iovec entries from the IOBuf list and submit a single
+ // IORING_OP_WRITEV. Cap at IOURING_IOV_MAX to stay within SQ limits.
+ // ------------------------------------------------------------------
+ static const size_t IOURING_IOV_MAX = 256;
+ std::vector<struct iovec> iov;
+ ssize_t total_bytes = 0;
+
+ for (size_t i = 0; i < ndata; ++i) {
+ if (!from[i] || from[i]->empty()) { continue; }
+ butil::IOBuf& buf = *from[i];
+ while (!buf.empty() && iov.size() < IOURING_IOV_MAX) {
+ const void* seg_ptr = buf.fetch1();
+ size_t seg_len = buf.backing_block(0).size();
+ if (seg_len == 0) { break; }
+ iov.push_back({const_cast<void*>(seg_ptr), seg_len});
+ total_bytes += static_cast<ssize_t>(seg_len);
+ buf.pop_front(seg_len);
+ }
+ if (iov.size() >= IOURING_IOV_MAX) { break; }
+ }
+
+ if (iov.empty()) { return 0; }
+
+ IouringReqContext* ctx = butil::get_object<IouringReqContext>();
+ if (!ctx) { errno = ENOMEM; return -1; }
+ ctx->op = IOURING_OP_WRITE;
+ ctx->fd = fd;
+ ctx->socket_id = _socket->id();
+ ctx->bounce = nullptr;
+
+ int ret = SubmitOneSqe([&](struct io_uring_sqe* sqe) {
+ io_uring_prep_writev(sqe, fd, iov.data(),
+ static_cast<unsigned>(iov.size()), 0);
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ });
+ if (ret < 0) { butil::return_object(ctx); return -1; }
+ _inflight_writes.fetch_add(1, butil::memory_order_relaxed);
+ return total_bytes;
+}
+
+bool IouringEndpoint::IsWritable() const {
+ return _inflight_writes.load(butil::memory_order_relaxed) <
MAX_INFLIGHT_WRITES;
+}
+
+// ---------------------------------------------------------------------------
+// PollCq – CQE completion handler
+//
+// READ_FIXED path (--iouring_register_buffers=true, zero-copy)
+// -------------------------------------------------------------
+// The kernel has written |res| bytes directly into ep->_read_slot.buf (a
+// pre-registered, pinned page). IOBuf::append_user_data() wraps those bytes
+// without copying; the deleter calls IouringMemPool::Deallocate(buf)
+// (thread-safe) when the last IOBuf reference is dropped.
+// A fresh block is allocated from IouringMemPool for the next READ_FIXED.
+//
+// READ path (--iouring_register_buffers=false)
+// --------------------------------------------
+// ctx->bounce points to a per-call malloc'd bounce buffer (64 KiB).
+// IOBuf takes ownership (free() destructor) when the data is wrapped.
+// ---------------------------------------------------------------------------
+
+void IouringEndpoint::PollCq(Socket* m) {
+ IouringEndpoint* ep = static_cast<IouringEndpoint*>(m->user());
+ if (!ep) { return; }
+
+ SocketUniquePtr s;
+ if (Socket::Address(ep->_socket->id(), &s) < 0) { return; }
+
+ // CQ-side operations need only the ring pointer; no lock required here
+ // because io_uring_peek_batch_cqe / io_uring_cqe_seen operate on the CQ
+ // which is only touched by the Poller thread.
+ Poller* poller = ep->GetPoller();
+ if (!poller) { return; }
+ struct io_uring* ring = &poller->ring;
+
+ struct io_uring_cqe* cqes[FLAGS_iouring_max_cqe_poll_once];
+ InputMessageClosure last_msg;
+ int progress = Socket::PROGRESS_INIT;
+
+ while (true) {
+ const int cnt = io_uring_peek_batch_cqe(
+ ring, cqes,
static_cast<unsigned>(FLAGS_iouring_max_cqe_poll_once));
+
+ if (cnt <= 0) {
+ if (s->Failed()) { return; }
+ if (!m->MoreReadEvents(&progress)) { break; }
+ continue;
+ }
+
+ ssize_t bytes_read = 0;
+
+ for (int i = 0; i < cnt; ++i) {
+ struct io_uring_cqe* cqe = cqes[i];
+ const uint64_t udata = cqe->user_data;
+
+ // Only process CQEs that bRPC submitted (bit 63 == 1).
+ // All other CQEs are user-submitted; leave them in the ring so the
+ // user callback (called right after PollCq returns) can
+ // drain and handle them. Users need no special tagging – bit 63
+ // is never set in a canonical user-space pointer or small integer.
+ if (!(udata & kBrpcCqeTag)) {
+ continue; // do NOT call io_uring_cqe_seen()
+ }
Review Comment:
`PollCq()` currently `continue`s when it sees a CQE without `kBrpcCqeTag`.
Because the CQ is FIFO, this can lead to a tight infinite loop when a user CQE
is at the head (no CQ head progress), and it can also cause out-of-order
`io_uring_cqe_seen()` calls later in the batch (potentially dropping user
CQEs). `PollCq()` should stop processing when an untagged CQE is at the head,
and break at the first untagged CQE encountered.
##########
example/iouring_echo_c++/server.cpp:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// An echo server demonstrating all io_uring transport modes supported by bRPC.
+//
+// ---------------------------------------------------------------------------
+// Design: one-thread-per-ring polling mode
+// ---------------------------------------------------------------------------
+//
+// In this mode a dedicated Poller bthread drives the io_uring ring for each
+// bthread_tag. All SQ operations (SubmitRead, CutFromIOBufList) execute on
+// that Poller thread, so no locking is needed on the hot I/O path.
+//
+// ---------------------------------------------------------------------------
+// Quick-start examples
+// ---------------------------------------------------------------------------
+//
+// # Default: epoll (no io_uring)
+// ./echo_server
+//
+// # io_uring, interrupt-driven Poller (none mode, 1 ms timeout between
peeks)
+// ./echo_server --use_iouring
+//
+// # io_uring + SQPOLL mode (lowest latency; needs CAP_SYS_NICE / root)
+// ./echo_server --use_iouring --iouring_polling_mode=sqpoll
+//
+// # io_uring + hybrid mode (busy-spin N times then block)
+// ./echo_server --use_iouring --iouring_polling_mode=hybrid
+//
+// # io_uring + zero-copy registered buffers (highest throughput)
+// ./echo_server --use_iouring --iouring_register_buffers
+//
+// # io_uring + SQPOLL + zero-copy (latency + throughput optimised)
+// ./echo_server --use_iouring --iouring_polling_mode=sqpoll \
+// --iouring_register_buffers
+//
+// ---------------------------------------------------------------------------
+// Tunables (all --iouring_* flags are forwarded to the transport layer)
+// ---------------------------------------------------------------------------
+//
+// Ring sizing
+// --iouring_sq_size=N SQ depth per ring (default 256).
+// --iouring_cq_size=N CQ depth per ring (0 = 2 * sq_size).
+//
+// Poller thread
+// --iouring_poller_yield Yield after each poll iteration (reduces CPU,
+// increases tail latency).
+// --iouring_max_cqe_poll_once=N Max CQEs reaped per peek call (default 32).
+//
+// Polling mode (selects ring setup flags and CQE-reap strategy)
+// --iouring_polling_mode= none No kernel-side polling; Poller waits up
+// to 1 ms between peeks (default).
+// sqpoll Kernel SQ polling thread
(IORING_SETUP_SQPOLL).
+// iopoll Block-device completion polling
(O_DIRECT only).
+// hybrid Busy-spin N times then block.
+// --iouring_sqpoll_idle_ms SQPOLL kernel thread idle timeout ms (default
2000).
+// --iouring_sqpoll_cpu=N CPU to pin the SQPOLL thread (-1 = no pin).
+// --iouring_hybrid_spin_count=N Spin iterations for hybrid mode (default
1000).
+//
+// Registered buffers (zero-copy)
+// --iouring_register_buffers Enable READ_FIXED / WRITE_FIXED.
+// --iouring_mem_pool_initial_mb=N Initial registered memory (default 256
MiB).
+// --iouring_mem_pool_increase_mb=N Growth step (default 256 MiB).
+// --iouring_mem_pool_max_regions=N Max growth regions (default 8).
+// --iouring_iobuf_block_size=N IOBuf block / slot size in bytes (default
8192).
+// --iouring_read_slot_num=N Initial read slots per ring (default 256).
+// --iouring_read_slot_max=N Max read slots per ring (default 4096).
Review Comment:
The example comments mention `--iouring_read_slot_num` /
`--iouring_read_slot_max`, but these flags are not implemented (no gflags
definitions). This can confuse users trying to run/tune the example.
##########
src/brpc/iouring/iouring_endpoint.cpp:
##########
@@ -0,0 +1,1016 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_IOURING
+
+#include <errno.h>
+#include <liburing.h>
+#include <sys/uio.h>
+#include <unordered_set>
+#include <string.h>
+
+#include <gflags/gflags.h>
+#include "butil/atomicops.h"
+#include "butil/fd_utility.h"
+#include "butil/logging.h"
+#include "butil/macros.h"
+#include "butil/object_pool.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "bthread/bthread.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/iouring/iouring_block_pool.h"
+#include "brpc/iouring/iouring_helper.h"
+#include "brpc/iouring/iouring_endpoint.h"
+
+DECLARE_int32(task_group_ntags);
+
+namespace brpc {
+namespace iouring {
+
+// ---------------------------------------------------------------------------
+// gflags (endpoint-level tunables)
+// ---------------------------------------------------------------------------
+
+// Each bthread_tag has exactly one Poller bthread (and one io_uring ring).
+// io_uring's SQ is single-producer; bthread work-stealing means multiple
+// Pollers per tag could run on different pthreads and race on the SQ.
+// Horizontal scaling is achieved by increasing --task_group_ntags instead.
+
+DEFINE_bool(iouring_poller_yield, false,
+ "Yield (bthread_yield / sched_yield) after each poll iteration. "
+ "Reduces CPU usage at the cost of higher tail latency.");
+
+DEFINE_int32(iouring_max_cqe_poll_once, 32,
+ "Maximum CQEs reaped per io_uring_peek_batch_cqe() call.");
+
+static const int32_t MAX_INFLIGHT_WRITES = 64;
+
+// ---------------------------------------------------------------------------
+// Constructor / Destructor
+// ---------------------------------------------------------------------------
+
+IouringEndpoint::IouringEndpoint(Socket* s)
+ : _socket(s)
+ , _inflight_writes(0)
+{
+ _read_slot = {};
+}
+
+IouringEndpoint::~IouringEndpoint() {
+ Reset();
+}
+
+void IouringEndpoint::Reset() {
+ DeallocateResources();
+ _inflight_writes.store(0, butil::memory_order_relaxed);
+}
+
+// ---------------------------------------------------------------------------
+// Resource management
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::AllocateResources() {
+ // Register this socket with the Poller via the MPSC op_queue.
+ // The Poller thread will dequeue the ADD message on its next iteration
+ // and issue the first SubmitRead there – on the Poller thread, without
+ // any locking.
+ //
+ // Registered-buffer mode: acquire a fixed read slot on the Poller thread
+ // when processing the ADD message (see main loop). Nothing to do here.
+ PollerAddSid();
+ return 0;
+}
+
+void IouringEndpoint::DeallocateResources() {
+ // Embed the current read slot (if any) in the REMOVE message so the
+ // Poller thread can call IouringMemPool::Deallocate(buf) for any
+ // in-flight READ_FIXED buffer that never got a CQE.
+ PollerRemoveSid(_read_slot);
+ _read_slot = {};
+}
+
+// ---------------------------------------------------------------------------
+// Ring / Poller access
+// ---------------------------------------------------------------------------
+
+IouringEndpoint::Poller* IouringEndpoint::GetPoller() const {
+ bthread_tag_t tag = bthread_self_tag();
+ if (tag < 0 || tag >= static_cast<int>(_poller_groups.size())) { tag = 0; }
+ auto& pollers = _poller_groups[tag].pollers;
+ const size_t index = butil::fmix32(_socket->id()) % pollers.size();
+ if (!pollers[index].ring_initialized) { return nullptr; }
+ return &pollers[index];
+}
+
+// ---------------------------------------------------------------------------
+// IouringPollerHandle – implementation
+//
+// ring() and Submit() are implemented here (not in the header) because they
+// need access to IouringEndpoint::Poller and IouringEndpoint::_poller_groups,
+// which are private and not yet fully defined at the point where
+// IouringPollerHandle is declared in iouring_helper.h.
+// ---------------------------------------------------------------------------
+
+int IouringPollerHandle::Submit(
+ std::function<int(::io_uring*)> prepare_fn) const {
+ if (tag_ < 0 || tag_ >= static_cast<int>(
+ IouringEndpoint::_poller_groups.size())) {
+ errno = ENODEV;
+ return -1;
+ }
+ auto& pollers = IouringEndpoint::_poller_groups[tag_].pollers;
+ if (index_ < 0 || index_ >= static_cast<int>(pollers.size())) {
+ errno = ENODEV;
+ return -1;
+ }
+ IouringEndpoint::Poller& poller = pollers[index_];
+ if (!poller.ring_initialized) {
+ errno = ENODEV;
+ return -1;
+ }
+ // Called on the Poller thread (passive path only); no lock needed.
+ int n = prepare_fn(&poller.ring);
+ if (n < 0) { errno = EBUSY; return -1; }
+ if (n == 0) { return 0; }
+ int ret = io_uring_submit(&poller.ring);
+ if (ret < 0) { errno = -ret; return -1; }
+ return ret;
+}
+
+// ---------------------------------------------------------------------------
+// SubmitOneSqe
+//
+// Must be called on the Poller thread; no locking.
+// Gets one SQE, calls |prepare_fn(sqe)|, issues io_uring_submit().
+// Returns io_uring_submit() result (>= 0) or -1 (errno set).
+// errno=ENOBUFS → SQ full
+// errno=ENODEV → ring not yet initialised
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::SubmitOneSqe(
+ std::function<void(struct io_uring_sqe*)> prepare_fn) {
+ Poller* poller = GetPoller();
+ if (!poller) { errno = ENODEV; return -1; }
+
+ struct io_uring_sqe* sqe = io_uring_get_sqe(&poller->ring);
+ if (!sqe) { errno = ENOBUFS; return -1; }
+ prepare_fn(sqe);
+ int ret = io_uring_submit(&poller->ring);
+ if (ret < 0) { errno = -ret; return -1; }
+ return ret;
+}
+
+// ---------------------------------------------------------------------------
+// Ring parameters factory
+// ---------------------------------------------------------------------------
+
+struct io_uring_params IouringEndpoint::BuildRingParams() {
+ struct io_uring_params p;
+ memset(&p, 0, sizeof(p));
+
+ const IouringPollingMode mode = GetPollingMode();
+
+ switch (mode) {
+ case IouringPollingMode::SQPOLL:
+ case IouringPollingMode::HYBRID:
+ p.flags |= IORING_SETUP_SQPOLL;
+ if (FLAGS_iouring_sqpoll_idle_ms > 0) {
+ p.sq_thread_idle =
static_cast<unsigned>(FLAGS_iouring_sqpoll_idle_ms);
+ }
+ if (FLAGS_iouring_sqpoll_cpu >= 0) {
+ p.flags |= IORING_SETUP_SQ_AFF;
+ p.sq_thread_cpu = static_cast<unsigned>(FLAGS_iouring_sqpoll_cpu);
+ }
+ break;
+ case IouringPollingMode::IOPOLL:
+ p.flags |= IORING_SETUP_IOPOLL;
+ break;
+ case IouringPollingMode::NONE:
+ default:
+ break;
+ }
+
+ if (FLAGS_iouring_cq_size > 0) {
+ p.flags |= IORING_SETUP_CQSIZE;
+ p.cq_entries = static_cast<unsigned>(FLAGS_iouring_cq_size);
+ }
+
+ return p;
+}
+
+// ---------------------------------------------------------------------------
+// SubmitRead
+//
+// Registered mode (--iouring_register_buffers=true):
+// Allocates one block from IouringMemPool (already pre-registered with the
+// ring), stores it in _read_slot, and submits IORING_OP_READ_FIXED.
+// On completion PollCq wraps the block zero-copy in IOBuf; the IOBuf
+// deleter calls IouringMemPool::Deallocate(buf) (thread-safe).
+//
+// Unregistered mode (--iouring_register_buffers=false):
+// IORING_OP_READ into a per-call malloc bounce buffer (64 KiB).
+// The buffer is owned by IOBuf after PollCq and free()'d when consumed.
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::SubmitRead(int fd) {
+ IouringReqContext* ctx = butil::get_object<IouringReqContext>();
+ if (!ctx) { errno = ENOMEM; return -1; }
+ ctx->op = IOURING_OP_READ; // will be overwritten below
+ ctx->fd = fd;
+ ctx->socket_id = _socket->id();
+ ctx->bounce = nullptr;
+
+ if (IsFixedBuffersEnabled()) {
+ // Registered path – allocate a fresh block from IouringMemPool.
+ IouringMemPool& mp = IouringMemPool::Instance();
+ const size_t blksz = mp.block_size();
+ void* buf = mp.Allocate(blksz);
+ if (!buf) {
+ butil::return_object(ctx);
+ errno = ENOMEM;
+ return -1;
+ }
+
+ // Look up the pre-registered buf_index for this block.
+ Poller* poller = GetPoller();
+ if (!poller) {
+ mp.Deallocate(buf);
+ butil::return_object(ctx);
+ errno = ENODEV;
+ return -1;
+ }
+ const int buf_index = mp.GetBufIndex(&poller->ring, buf);
+ if (buf_index < 0) {
+ // Should never happen: the block was just allocated from a
+ // registered region. Log and fail rather than silently.
+ LOG(ERROR) << "io_uring SubmitRead: GetBufIndex returned -1 "
+ "for a freshly allocated block; this is a bug.";
+ mp.Deallocate(buf);
+ butil::return_object(ctx);
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Record the slot so PollCq (and DeallocateResources) can find it.
+ _read_slot.buf = buf;
+ _read_slot.buf_index = buf_index;
+ _read_slot.size = blksz;
+
+ ctx->op = IOURING_OP_READ_FIXED;
+ int ret = SubmitOneSqe([&](struct io_uring_sqe* sqe) {
+ io_uring_prep_read_fixed(sqe, fd,
+ buf,
+ static_cast<unsigned>(blksz),
+ /*offset=*/0,
+ buf_index);
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ });
+ if (ret < 0) {
+ // Submission failed; release the block immediately.
+ mp.Deallocate(buf);
+ _read_slot = {};
+ butil::return_object(ctx);
+ return -1;
+ }
+ return 0;
+ }
+
+ // Unregistered path – allocate a temporary bounce buffer.
+ constexpr size_t kBounceSize = 65536;
+ void* bounce = malloc(kBounceSize);
+ if (!bounce) { butil::return_object(ctx); errno = ENOMEM; return -1; }
+ ctx->op = IOURING_OP_READ;
+
+ int ret = SubmitOneSqe([&](struct io_uring_sqe* sqe) {
+ io_uring_prep_read(sqe, fd, bounce, static_cast<unsigned>(kBounceSize),
+ /*offset=*/0);
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ });
+ if (ret < 0) { free(bounce); butil::return_object(ctx); return -1; }
+ ctx->bounce = bounce; // PollCq takes ownership and wraps it in IOBuf
+ return 0;
+}
+
+
+ssize_t IouringEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
+ CHECK(from != nullptr);
+ CHECK(ndata > 0);
+
+ if (!IsWritable()) { errno = EAGAIN; return -1; }
+
+ const int fd = _socket->fd();
+
+ // ------------------------------------------------------------------
+ // WRITE_FIXED path (--iouring_register_buffers=true)
+ // ------------------------------------------------------------------
+ // Every IOBuf block comes from IouringMemPool (a pre-registered slab).
+ // GetBufIndex() must always succeed; if it returns -1 a block somehow
+ // escaped the pool – LOG(ERROR) and fail rather than silently degrading.
+ //
+ // One IORING_OP_WRITE_FIXED SQE is submitted per block so the kernel can
+ // DMA directly from pinned pages with no per-op get_user_pages overhead.
+ // Called on the Poller thread; no locking needed.
+ // ------------------------------------------------------------------
+ if (IsFixedBuffersEnabled()) {
+ Poller* poller = GetPoller();
+ if (!poller) { errno = ENODEV; return -1; }
+ IouringMemPool& mp = IouringMemPool::Instance();
+ struct io_uring* ring = &poller->ring;
+
+ ssize_t total_bytes = 0;
+ int sqes_queued = 0;
+
+ for (size_t i = 0; i < ndata; ++i) {
+ if (!from[i] || from[i]->empty()) { continue; }
+ butil::IOBuf& buf = *from[i];
+ while (!buf.empty()) {
+ const void* seg_ptr = buf.fetch1();
+ size_t seg_len = buf.backing_block(0).size();
+ if (seg_len == 0) { break; }
+
+ int buf_idx = mp.GetBufIndex(ring, seg_ptr);
+ if (buf_idx < 0) {
+ // Every block must be registered when
+ // --iouring_register_buffers=true. A -1 here means a
+ // block escaped the pool – this is a programming error.
+ LOG(ERROR) << "io_uring: unregistered IOBuf block ptr="
+ << seg_ptr << " fd=" << fd
+ << "; submission aborted.";
+ errno = EINVAL;
+ break;
+ }
+
+ struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
+ if (!sqe) { errno = ENOBUFS; break; }
+
+ io_uring_prep_write_fixed(sqe, fd,
+ const_cast<void*>(seg_ptr),
+ static_cast<unsigned>(seg_len),
+ 0, buf_idx);
+ IouringReqContext* ctx =
butil::get_object<IouringReqContext>();
+ if (!ctx) { errno = ENOMEM; break; }
+ ctx->op = IOURING_OP_WRITE_FIXED;
+ ctx->fd = fd;
+ ctx->socket_id = _socket->id();
+ ctx->bounce = nullptr;
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ total_bytes += static_cast<ssize_t>(seg_len);
+ ++sqes_queued;
+ buf.pop_front(seg_len);
+ }
+ }
+
+ if (sqes_queued > 0) {
+ int ret = io_uring_submit(ring);
+ if (ret < 0) {
+ errno = -ret;
+ return total_bytes > 0 ? total_bytes : -1;
+ }
+ _inflight_writes.fetch_add(sqes_queued,
butil::memory_order_relaxed);
+ }
+ return total_bytes > 0 ? total_bytes : (sqes_queued == 0 ? 0 : -1);
+ }
+
+ // ------------------------------------------------------------------
+ // Plain WRITEV path (--iouring_register_buffers=false)
+ // ------------------------------------------------------------------
+ // Collect all iovec entries from the IOBuf list and submit a single
+ // IORING_OP_WRITEV. Cap at IOURING_IOV_MAX to stay within SQ limits.
+ // ------------------------------------------------------------------
+ static const size_t IOURING_IOV_MAX = 256;
+ std::vector<struct iovec> iov;
+ ssize_t total_bytes = 0;
+
+ for (size_t i = 0; i < ndata; ++i) {
+ if (!from[i] || from[i]->empty()) { continue; }
+ butil::IOBuf& buf = *from[i];
+ while (!buf.empty() && iov.size() < IOURING_IOV_MAX) {
+ const void* seg_ptr = buf.fetch1();
+ size_t seg_len = buf.backing_block(0).size();
+ if (seg_len == 0) { break; }
+ iov.push_back({const_cast<void*>(seg_ptr), seg_len});
+ total_bytes += static_cast<ssize_t>(seg_len);
+ buf.pop_front(seg_len);
+ }
+ if (iov.size() >= IOURING_IOV_MAX) { break; }
+ }
+
+ if (iov.empty()) { return 0; }
+
+ IouringReqContext* ctx = butil::get_object<IouringReqContext>();
+ if (!ctx) { errno = ENOMEM; return -1; }
+ ctx->op = IOURING_OP_WRITE;
+ ctx->fd = fd;
+ ctx->socket_id = _socket->id();
+ ctx->bounce = nullptr;
+
+ int ret = SubmitOneSqe([&](struct io_uring_sqe* sqe) {
+ io_uring_prep_writev(sqe, fd, iov.data(),
+ static_cast<unsigned>(iov.size()), 0);
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ });
+ if (ret < 0) { butil::return_object(ctx); return -1; }
+ _inflight_writes.fetch_add(1, butil::memory_order_relaxed);
+ return total_bytes;
+}
+
+bool IouringEndpoint::IsWritable() const {
+ return _inflight_writes.load(butil::memory_order_relaxed) <
MAX_INFLIGHT_WRITES;
+}
+
+// ---------------------------------------------------------------------------
+// PollCq – CQE completion handler
+//
+// READ_FIXED path (--iouring_register_buffers=true, zero-copy)
+// -------------------------------------------------------------
+// The kernel has written |res| bytes directly into ep->_read_slot.buf (a
+// pre-registered, pinned page). IOBuf::append_user_data() wraps those bytes
+// without copying; the deleter calls IouringMemPool::Deallocate(buf)
+// (thread-safe) when the last IOBuf reference is dropped.
+// A fresh block is allocated from IouringMemPool for the next READ_FIXED.
+//
+// READ path (--iouring_register_buffers=false)
+// --------------------------------------------
+// ctx->bounce points to a per-call malloc'd bounce buffer (64 KiB).
+// IOBuf takes ownership (free() destructor) when the data is wrapped.
+// ---------------------------------------------------------------------------
+
+void IouringEndpoint::PollCq(Socket* m) {
+ IouringEndpoint* ep = static_cast<IouringEndpoint*>(m->user());
+ if (!ep) { return; }
+
+ SocketUniquePtr s;
+ if (Socket::Address(ep->_socket->id(), &s) < 0) { return; }
+
+ // CQ-side operations need only the ring pointer; no lock required here
+ // because io_uring_peek_batch_cqe / io_uring_cqe_seen operate on the CQ
+ // which is only touched by the Poller thread.
+ Poller* poller = ep->GetPoller();
+ if (!poller) { return; }
+ struct io_uring* ring = &poller->ring;
+
+ struct io_uring_cqe* cqes[FLAGS_iouring_max_cqe_poll_once];
+ InputMessageClosure last_msg;
+ int progress = Socket::PROGRESS_INIT;
+
+ while (true) {
+ const int cnt = io_uring_peek_batch_cqe(
+ ring, cqes,
static_cast<unsigned>(FLAGS_iouring_max_cqe_poll_once));
+
+ if (cnt <= 0) {
+ if (s->Failed()) { return; }
+ if (!m->MoreReadEvents(&progress)) { break; }
+ continue;
+ }
+
+ ssize_t bytes_read = 0;
+
+ for (int i = 0; i < cnt; ++i) {
+ struct io_uring_cqe* cqe = cqes[i];
+ const uint64_t udata = cqe->user_data;
+
+ // Only process CQEs that bRPC submitted (bit 63 == 1).
+ // All other CQEs are user-submitted; leave them in the ring so the
+ // user callback (called right after PollCq returns) can
+ // drain and handle them. Users need no special tagging – bit 63
+ // is never set in a canonical user-space pointer or small integer.
+ if (!(udata & kBrpcCqeTag)) {
+ continue; // do NOT call io_uring_cqe_seen()
+ }
+
+ // Strip the tag bit to recover the original IouringReqContext*.
+ // udata == kBrpcCqeTag (NOP wake-up SQE) → ctx will be nullptr.
+ IouringReqContext* ctx =
+ reinterpret_cast<IouringReqContext*>(
+ static_cast<uintptr_t>(udata & ~kBrpcCqeTag));
+
+ if (!ctx) {
+ io_uring_cqe_seen(ring, cqe);
+ continue;
+ }
+
+ const int res = cqe->res;
+ io_uring_cqe_seen(ring, cqe);
+
+ // ---------------------------------------------------------------
+ // Error handling
+ // ---------------------------------------------------------------
+ if (res < 0) {
+ if (res == -ECANCELED) {
+ // Op was cancelled.
+ // READ_FIXED: _read_slot still holds the block that was
+ // submitted for this CQE. DeallocateResources will pass
+ // it in the REMOVE SidOp so the Poller thread can call
+ // IouringMemPool::Deallocate(buf).
+ // IOURING_OP_READ: free the malloc'd bounce buffer.
+ if (ctx->op == IOURING_OP_READ) { free(ctx->bounce); }
+ butil::return_object(ctx);
+ continue;
+ }
+
+ const int saved_errno = -res;
+ LOG(WARNING) << "io_uring CQE error fd=" << ctx->fd
+ << " socket_id=" << ctx->socket_id
+ << " op=" << (int)ctx->op
+ << ": " << berror(saved_errno);
+ SocketUniquePtr cs;
+ if (Socket::Address(ctx->socket_id, &cs) == 0) {
+ cs->SetFailed(saved_errno, "io_uring op error: %s",
+ berror(saved_errno));
+ }
+ if (ctx->op == IOURING_OP_READ) { free(ctx->bounce); }
+ butil::return_object(ctx);
+ continue;
+ }
+
+ // ---------------------------------------------------------------
+ // Dispatch by operation type
+ // ---------------------------------------------------------------
+ if (ctx->op == IOURING_OP_READ || ctx->op ==
IOURING_OP_READ_FIXED) {
+ if (res == 0) {
+ // EOF
+ SocketUniquePtr cs;
+ if (Socket::Address(ctx->socket_id, &cs) == 0) {
+ cs->SetEOF();
+ }
+ if (ctx->op == IOURING_OP_READ) { free(ctx->bounce); }
+ butil::return_object(ctx);
+ continue;
+ }
+
+ if (ctx->op == IOURING_OP_READ_FIXED) {
+ //
---------------------------------------------------------
+ // Registered mode – zero-copy.
+ //
+ // The kernel has written |res| bytes into the block at
+ // ep->_read_slot.buf (a page from IouringMemPool that is
+ // permanently pinned for this ring).
+ //
+ // Rotation:
+ // 1. Steal the current slot from the endpoint.
+ // 2. Submit the next READ_FIXED immediately (SubmitRead
+ // allocates a new block from IouringMemPool).
+ // 3. Wrap the data zero-copy in IOBuf; the deleter
+ // calls IouringMemPool::Deallocate(buf) on any thread
+ // when the last reference drops (thread-safe).
+ //
+ // No separate slot pool is needed: every IouringMemPool
+ // block already has a permanent buf_index, and
+ // Allocate/Deallocate are its own reference count.
+ //
---------------------------------------------------------
+ IouringReadSlot consumed_slot = ep->_read_slot;
+ ep->_read_slot = {};
+
+ // Submit the next read before handing off the consumed
+ // slot so back-to-back arrivals never stall.
+ ep->SubmitRead(ctx->fd);
+
+ // Zero-copy: wrap the consumed buffer in IOBuf.
+ // The lambda captures consumed_slot by value; it is
+ // self-contained and can run on any thread.
+ void* const buf = consumed_slot.buf;
+ butil::IOBuf tmp;
+ tmp.append_user_data(
+ buf,
+ static_cast<size_t>(res),
+ [buf](void* /*ptr*/) {
+ IouringMemPool::Instance().Deallocate(buf);
+ });
+ m->_read_buf.append(std::move(tmp));
+ } else {
+ //
---------------------------------------------------------
+ // Unregistered mode – bounce buffer.
+ //
+ // The bounce buffer was malloc'd in SubmitRead and stored
in
+ // ctx->bounce. Transfer ownership to IOBuf (free() is
+ // called when IOBuf discards the block).
+ //
---------------------------------------------------------
+ butil::IOBuf tmp;
+ tmp.append_user_data(ctx->bounce,
+ static_cast<size_t>(res),
+ free);
+ ctx->bounce = nullptr; // ownership transferred
+ m->_read_buf.append(std::move(tmp));
+
+ ep->SubmitRead(ctx->fd);
+ }
+
+ bytes_read += res;
+
+ } else {
+ // WRITE / WRITE_FIXED completion
+ IouringEndpoint* dep = ep;
+ if (ctx->socket_id != ep->_socket->id()) {
+ SocketUniquePtr cs;
+ if (Socket::Address(ctx->socket_id, &cs) == 0) {
+ dep = static_cast<IouringEndpoint*>(cs->user());
+ }
+ }
+ if (dep) {
+ dep->_inflight_writes.fetch_sub(1,
butil::memory_order_relaxed);
+ dep->_socket->WakeAsEpollOut();
+ }
+ }
+
+ butil::return_object(ctx);
+ } // for each cqe
+
+ if (bytes_read > 0) {
+ const int64_t received_us = butil::cpuwide_time_us();
+ const int64_t base_realtime = butil::gettimeofday_us() -
received_us;
+ InputMessenger* messenger =
static_cast<InputMessenger*>(s->user());
+ if (messenger && messenger->ProcessNewMessage(
+ s.get(), bytes_read, false,
+ received_us, base_realtime, last_msg) < 0) {
+ return;
+ }
+ }
+ }
+}
+
+// ---------------------------------------------------------------------------
+// DebugInfo
+// ---------------------------------------------------------------------------
+
+void IouringEndpoint::DebugInfo(std::ostream& os,
+ butil::StringPiece connector) const {
+ os << "iouring_polling_mode=" << FLAGS_iouring_polling_mode
+ << connector
+ << "iouring_inflight_writes="
+ << _inflight_writes.load(butil::memory_order_relaxed)
+ << connector << "iouring_writable=" << IsWritable()
+ << connector << "iouring_register_buffers=" << IsFixedBuffersEnabled();
+ if (IsFixedBuffersEnabled() && _read_slot.buf != nullptr) {
+ os << " buf_index=" << _read_slot.buf_index
+ << " slot_size=" << _read_slot.size;
+ }
+}
+
+// ---------------------------------------------------------------------------
+// GlobalInitialize / GlobalRelease
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::GlobalInitialize() {
+ _poller_groups = std::vector<PollerGroup>(FLAGS_task_group_ntags);
+ return 0;
+}
+
+void IouringEndpoint::GlobalRelease() {
+ for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
+ PollingModeRelease(i);
+ }
+}
+
+// ---------------------------------------------------------------------------
+// PollerDrainOpQueue
+//
+// Dequeues all pending SidOps from poller->op_queue and applies them:
+// ADD – track the socket, allocate a block from IouringMemPool (fixed
+// mode) via SubmitRead, issue the first read.
+// REMOVE – return any in-flight read buffer to IouringMemPool, stop
+// tracking the socket.
+//
+// Must run exclusively on the Poller thread.
+// ---------------------------------------------------------------------------
+
+void IouringEndpoint::PollerDrainOpQueue(
+ Poller* poller,
+ std::unordered_set<SocketId>& tracked_sids) {
+ SidOp op;
+ while (poller->op_queue.Dequeue(op)) {
+ if (op.type == SidOp::ADD) {
+ tracked_sids.emplace(op.sid);
+ SocketUniquePtr s_add;
+ if (Socket::Address(op.sid, &s_add) == 0) {
+ IouringEndpoint* ep =
+ static_cast<IouringEndpoint*>(s_add->user());
+ if (ep) {
+ // Issue the first SubmitRead on the Poller thread.
+ // In fixed-buffer mode SubmitRead allocates a block from
+ // IouringMemPool; no separate slot pool is needed.
+ if (ep->SubmitRead(s_add->fd()) < 0) {
+ LOG(WARNING)
+ << "IouringEndpoint: first SubmitRead "
+ "failed for socket "
+ << op.sid << ": " << berror();
+ }
+ }
+ }
+ } else {
+ // REMOVE: if a READ_FIXED was in flight (buf != null), the kernel
+ // has not yet written into it. Return the block to IouringMemPool
+ // now; no CQE will arrive for a cancelled READ_FIXED (it arrives
+ // as -ECANCELED and is freed in PollCq's error path instead).
+ // For safety we Deallocate here only when _read_slot is still
+ // owned by the endpoint (i.e. SubmitRead set it but PollCq has
+ // not yet consumed it).
+ if (IsFixedBuffersEnabled() && op.read_slot.buf != nullptr) {
+ IouringMemPool::Instance().Deallocate(op.read_slot.buf);
+ }
+ tracked_sids.erase(op.sid);
+ }
+ }
+}
+
+// ---------------------------------------------------------------------------
+// PollingModeInitialize
+// ---------------------------------------------------------------------------
+
+std::vector<IouringEndpoint::PollerGroup> IouringEndpoint::_poller_groups;
+
+int IouringEndpoint::PollingModeInitialize(
+ bthread_tag_t tag,
+ std::function<void(IouringPollerHandle)> callback,
+ std::function<void(IouringPollerHandle)> init_fn,
+ std::function<void(IouringPollerHandle)> release_fn) {
+
+ if (tag < 0 || tag >= static_cast<int>(_poller_groups.size())) {
+ LOG(ERROR) << "io_uring: invalid bthread tag " << tag;
+ return -1;
+ }
+
+ auto& group = _poller_groups[tag];
+ auto& pollers = group.pollers;
+ auto& running = group.running;
+
+ bool expected = false;
+ if (!running.compare_exchange_strong(expected, true)) { return 0; }
+
+ // -----------------------------------------------------------------------
+ // Poller thread arguments
+ // -----------------------------------------------------------------------
+ struct FnArgs {
+ Poller* poller;
+ std::atomic<bool>* running;
+ bthread_tag_t tag;
+ int index; // poller index within the group
+ };
+
+ // -----------------------------------------------------------------------
+ // Poller thread body
+ // -----------------------------------------------------------------------
+ auto fn = [](void* p) -> void* {
+ std::unique_ptr<FnArgs> args(static_cast<FnArgs*>(p));
+ Poller* poller = args->poller;
+ std::atomic<bool>* running = args->running;
+
+ // 1. Create the ring.
+ struct io_uring_params params = BuildRingParams();
+ const unsigned sq_size = static_cast<unsigned>(GetIouringSqSize());
+
+ int ret = io_uring_queue_init_params(sq_size, &poller->ring, ¶ms);
+ if (ret < 0) {
+ LOG(ERROR) << "io_uring_queue_init_params failed: " <<
berror(-ret);
+ running->store(false, std::memory_order_relaxed);
+ return nullptr;
+ }
+ poller->ring_initialized = true;
+
+ // 2. Initialise the fixed-buffer infrastructure.
+ if (IsFixedBuffersEnabled()) {
+ // 2a. Register this ring with IouringMemPool.
+ // The callback is called for each existing region immediately
+ // (to bring the ring up to date) and for each future region
+ // (when the pool grows). It issues register_buffers_update /
+ // full re-registration to pin the new pages in this ring.
+ IouringMemPool::Instance().AddRingRegistrar(
+ &poller->ring,
+ [poller](void* base, size_t size, size_t block_size,
+ int buf_index_base) {
+ // Build one iovec per block in this new region.
+ const int n = static_cast<int>(size / block_size);
+ std::vector<struct iovec> iovs(n);
+ for (int i = 0; i < n; ++i) {
+ iovs[i].iov_base =
+ static_cast<char*>(base) + i * block_size;
+ iovs[i].iov_len = block_size;
+ }
+ // Register the new region's buffers.
+ // io_uring_register_buffers_update (kernel >= 5.13) allows
+ // incremental updates; fall back to a full re-registration
+ // on older kernels or if the function is unavailable.
+ int ret = -ENOSYS;
+#ifdef IORING_REGISTER_BUFFERS_UPDATE
+ ret = io_uring_register_buffers_update(
+ &poller->ring,
+ static_cast<unsigned>(buf_index_base),
+ iovs.data(),
+ static_cast<unsigned>(n));
+#endif
+ if (ret < 0) {
+ // Full re-registration path.
+ // For the first region just call register_buffers.
+ // For subsequent regions we must rebuild the complete
+ // table; here we only have the new slice so we
+ // attempt a best-effort register and log on failure.
+ if (buf_index_base > 0) {
+ // Unregister previous table before re-registering.
+ io_uring_unregister_buffers(&poller->ring);
+ }
+ int r2 = io_uring_register_buffers(&poller->ring,
+ iovs.data(),
+
static_cast<unsigned>(n));
+ if (r2 < 0) {
+ LOG(ERROR)
+ << "io_uring_register_buffers failed for new "
+ "region (buf_index_base=" << buf_index_base
+ << "): " << berror(-r2)
+ << " – any IOBuf block from this region will "
+ "cause EINVAL on the write path "
+ "(no WRITEV fallback in fixed-buffer
mode).";
+ }
+ }
+ });
+
+ LOG(INFO) << "io_uring fixed-buffer mode active: "
+ "READ_FIXED/WRITE_FIXED enabled.";
+ }
+
+ if (poller->init_fn) {
+ poller->init_fn(IouringPollerHandle(args->tag, args->index));
+ }
+
+ // 3. CQE reap strategy.
+ const IouringPollingMode mode = GetPollingMode();
+ const int hybrid_spins = FLAGS_iouring_hybrid_spin_count;
+ struct io_uring_cqe* cqes[FLAGS_iouring_max_cqe_poll_once]; // used
by SQPOLL/HYBRID peek only
+ std::unordered_set<SocketId> tracked_sids;
+ SidOp op;
+
+ // 4. Main loop.
+ while (running->load(std::memory_order_relaxed)) {
+ // a) Drain op_queue.
+ PollerDrainOpQueue(poller, tracked_sids);
+
+ // b) Reap CQEs.
+ bool got_cqe = false; // used by SQPOLL/IOPOLL/HYBRID branches
+
+ if (mode == IouringPollingMode::NONE) {
+ // Interrupt-driven mode: block up to 1 ms waiting for a CQE.
+ // The 1 ms timeout keeps the Poller loop responsive to new
+ // connections arriving in op_queue without burning CPU when
+ // there is no I/O traffic.
+ struct io_uring_cqe* cqe = nullptr;
+ struct __kernel_timespec ts{0, 1000000}; // 1 ms
+ int r = io_uring_wait_cqe_timeout(&poller->ring, &cqe, &ts);
+ if (r == 0 && cqe) {
+ io_uring_cqe_seen(&poller->ring, cqe);
+ }
Review Comment:
In `IouringPollingMode::NONE`, the poller calls
`io_uring_wait_cqe_timeout()` and then immediately `io_uring_cqe_seen()` on the
returned CQE. This consumes (drops) a completion without processing it in
`PollCq()`, which can lose reads/writes and break the transport. The wait
should be used only as a blocking primitive; CQEs should be left for `PollCq()`
to reap and handle.
##########
src/brpc/iouring/iouring_endpoint.cpp:
##########
@@ -0,0 +1,1016 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_IOURING
+
+#include <errno.h>
+#include <liburing.h>
+#include <sys/uio.h>
+#include <unordered_set>
+#include <string.h>
+
+#include <gflags/gflags.h>
+#include "butil/atomicops.h"
+#include "butil/fd_utility.h"
+#include "butil/logging.h"
+#include "butil/macros.h"
+#include "butil/object_pool.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "bthread/bthread.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/iouring/iouring_block_pool.h"
+#include "brpc/iouring/iouring_helper.h"
+#include "brpc/iouring/iouring_endpoint.h"
+
+DECLARE_int32(task_group_ntags);
+
+namespace brpc {
+namespace iouring {
+
+// ---------------------------------------------------------------------------
+// gflags (endpoint-level tunables)
+// ---------------------------------------------------------------------------
+
+// Each bthread_tag has exactly one Poller bthread (and one io_uring ring).
+// io_uring's SQ is single-producer; bthread work-stealing means multiple
+// Pollers per tag could run on different pthreads and race on the SQ.
+// Horizontal scaling is achieved by increasing --task_group_ntags instead.
+
+DEFINE_bool(iouring_poller_yield, false,
+ "Yield (bthread_yield / sched_yield) after each poll iteration. "
+ "Reduces CPU usage at the cost of higher tail latency.");
+
+DEFINE_int32(iouring_max_cqe_poll_once, 32,
+ "Maximum CQEs reaped per io_uring_peek_batch_cqe() call.");
+
+static const int32_t MAX_INFLIGHT_WRITES = 64;
+
+// ---------------------------------------------------------------------------
+// Constructor / Destructor
+// ---------------------------------------------------------------------------
+
+IouringEndpoint::IouringEndpoint(Socket* s)
+ : _socket(s)
+ , _inflight_writes(0)
+{
+ _read_slot = {};
+}
+
+IouringEndpoint::~IouringEndpoint() {
+ Reset();
+}
+
+void IouringEndpoint::Reset() {
+ DeallocateResources();
+ _inflight_writes.store(0, butil::memory_order_relaxed);
+}
+
+// ---------------------------------------------------------------------------
+// Resource management
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::AllocateResources() {
+ // Register this socket with the Poller via the MPSC op_queue.
+ // The Poller thread will dequeue the ADD message on its next iteration
+ // and issue the first SubmitRead there – on the Poller thread, without
+ // any locking.
+ //
+ // Registered-buffer mode: acquire a fixed read slot on the Poller thread
+ // when processing the ADD message (see main loop). Nothing to do here.
+ PollerAddSid();
+ return 0;
+}
+
+void IouringEndpoint::DeallocateResources() {
+ // Embed the current read slot (if any) in the REMOVE message so the
+ // Poller thread can call IouringMemPool::Deallocate(buf) for any
+ // in-flight READ_FIXED buffer that never got a CQE.
+ PollerRemoveSid(_read_slot);
+ _read_slot = {};
+}
+
+// ---------------------------------------------------------------------------
+// Ring / Poller access
+// ---------------------------------------------------------------------------
+
+IouringEndpoint::Poller* IouringEndpoint::GetPoller() const {
+ bthread_tag_t tag = bthread_self_tag();
+ if (tag < 0 || tag >= static_cast<int>(_poller_groups.size())) { tag = 0; }
+ auto& pollers = _poller_groups[tag].pollers;
+ const size_t index = butil::fmix32(_socket->id()) % pollers.size();
+ if (!pollers[index].ring_initialized) { return nullptr; }
+ return &pollers[index];
+}
+
+// ---------------------------------------------------------------------------
+// IouringPollerHandle – implementation
+//
+// ring() and Submit() are implemented here (not in the header) because they
+// need access to IouringEndpoint::Poller and IouringEndpoint::_poller_groups,
+// which are private and not yet fully defined at the point where
+// IouringPollerHandle is declared in iouring_helper.h.
+// ---------------------------------------------------------------------------
+
+int IouringPollerHandle::Submit(
+ std::function<int(::io_uring*)> prepare_fn) const {
+ if (tag_ < 0 || tag_ >= static_cast<int>(
+ IouringEndpoint::_poller_groups.size())) {
+ errno = ENODEV;
+ return -1;
+ }
+ auto& pollers = IouringEndpoint::_poller_groups[tag_].pollers;
+ if (index_ < 0 || index_ >= static_cast<int>(pollers.size())) {
+ errno = ENODEV;
+ return -1;
+ }
+ IouringEndpoint::Poller& poller = pollers[index_];
+ if (!poller.ring_initialized) {
+ errno = ENODEV;
+ return -1;
+ }
+ // Called on the Poller thread (passive path only); no lock needed.
+ int n = prepare_fn(&poller.ring);
+ if (n < 0) { errno = EBUSY; return -1; }
+ if (n == 0) { return 0; }
+ int ret = io_uring_submit(&poller.ring);
+ if (ret < 0) { errno = -ret; return -1; }
+ return ret;
+}
+
+// ---------------------------------------------------------------------------
+// SubmitOneSqe
+//
+// Must be called on the Poller thread; no locking.
+// Gets one SQE, calls |prepare_fn(sqe)|, issues io_uring_submit().
+// Returns io_uring_submit() result (>= 0) or -1 (errno set).
+// errno=ENOBUFS → SQ full
+// errno=ENODEV → ring not yet initialised
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::SubmitOneSqe(
+ std::function<void(struct io_uring_sqe*)> prepare_fn) {
+ Poller* poller = GetPoller();
+ if (!poller) { errno = ENODEV; return -1; }
+
+ struct io_uring_sqe* sqe = io_uring_get_sqe(&poller->ring);
+ if (!sqe) { errno = ENOBUFS; return -1; }
+ prepare_fn(sqe);
+ int ret = io_uring_submit(&poller->ring);
+ if (ret < 0) { errno = -ret; return -1; }
+ return ret;
+}
+
+// ---------------------------------------------------------------------------
+// Ring parameters factory
+// ---------------------------------------------------------------------------
+
+struct io_uring_params IouringEndpoint::BuildRingParams() {
+ struct io_uring_params p;
+ memset(&p, 0, sizeof(p));
+
+ const IouringPollingMode mode = GetPollingMode();
+
+ switch (mode) {
+ case IouringPollingMode::SQPOLL:
+ case IouringPollingMode::HYBRID:
+ p.flags |= IORING_SETUP_SQPOLL;
+ if (FLAGS_iouring_sqpoll_idle_ms > 0) {
+ p.sq_thread_idle =
static_cast<unsigned>(FLAGS_iouring_sqpoll_idle_ms);
+ }
+ if (FLAGS_iouring_sqpoll_cpu >= 0) {
+ p.flags |= IORING_SETUP_SQ_AFF;
+ p.sq_thread_cpu = static_cast<unsigned>(FLAGS_iouring_sqpoll_cpu);
+ }
+ break;
+ case IouringPollingMode::IOPOLL:
+ p.flags |= IORING_SETUP_IOPOLL;
+ break;
+ case IouringPollingMode::NONE:
+ default:
+ break;
+ }
+
+ if (FLAGS_iouring_cq_size > 0) {
+ p.flags |= IORING_SETUP_CQSIZE;
+ p.cq_entries = static_cast<unsigned>(FLAGS_iouring_cq_size);
+ }
+
+ return p;
+}
+
+// ---------------------------------------------------------------------------
+// SubmitRead
+//
+// Registered mode (--iouring_register_buffers=true):
+// Allocates one block from IouringMemPool (already pre-registered with the
+// ring), stores it in _read_slot, and submits IORING_OP_READ_FIXED.
+// On completion PollCq wraps the block zero-copy in IOBuf; the IOBuf
+// deleter calls IouringMemPool::Deallocate(buf) (thread-safe).
+//
+// Unregistered mode (--iouring_register_buffers=false):
+// IORING_OP_READ into a per-call malloc bounce buffer (64 KiB).
+// The buffer is owned by IOBuf after PollCq and free()'d when consumed.
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::SubmitRead(int fd) {
+ IouringReqContext* ctx = butil::get_object<IouringReqContext>();
+ if (!ctx) { errno = ENOMEM; return -1; }
+ ctx->op = IOURING_OP_READ; // will be overwritten below
+ ctx->fd = fd;
+ ctx->socket_id = _socket->id();
+ ctx->bounce = nullptr;
+
+ if (IsFixedBuffersEnabled()) {
+ // Registered path – allocate a fresh block from IouringMemPool.
+ IouringMemPool& mp = IouringMemPool::Instance();
+ const size_t blksz = mp.block_size();
+ void* buf = mp.Allocate(blksz);
+ if (!buf) {
+ butil::return_object(ctx);
+ errno = ENOMEM;
+ return -1;
+ }
+
+ // Look up the pre-registered buf_index for this block.
+ Poller* poller = GetPoller();
+ if (!poller) {
+ mp.Deallocate(buf);
+ butil::return_object(ctx);
+ errno = ENODEV;
+ return -1;
+ }
+ const int buf_index = mp.GetBufIndex(&poller->ring, buf);
+ if (buf_index < 0) {
+ // Should never happen: the block was just allocated from a
+ // registered region. Log and fail rather than silently.
+ LOG(ERROR) << "io_uring SubmitRead: GetBufIndex returned -1 "
+ "for a freshly allocated block; this is a bug.";
+ mp.Deallocate(buf);
+ butil::return_object(ctx);
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Record the slot so PollCq (and DeallocateResources) can find it.
+ _read_slot.buf = buf;
+ _read_slot.buf_index = buf_index;
+ _read_slot.size = blksz;
+
+ ctx->op = IOURING_OP_READ_FIXED;
+ int ret = SubmitOneSqe([&](struct io_uring_sqe* sqe) {
+ io_uring_prep_read_fixed(sqe, fd,
+ buf,
+ static_cast<unsigned>(blksz),
+ /*offset=*/0,
+ buf_index);
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ });
+ if (ret < 0) {
+ // Submission failed; release the block immediately.
+ mp.Deallocate(buf);
+ _read_slot = {};
+ butil::return_object(ctx);
+ return -1;
+ }
+ return 0;
+ }
+
+ // Unregistered path – allocate a temporary bounce buffer.
+ constexpr size_t kBounceSize = 65536;
+ void* bounce = malloc(kBounceSize);
+ if (!bounce) { butil::return_object(ctx); errno = ENOMEM; return -1; }
+ ctx->op = IOURING_OP_READ;
+
+ int ret = SubmitOneSqe([&](struct io_uring_sqe* sqe) {
+ io_uring_prep_read(sqe, fd, bounce, static_cast<unsigned>(kBounceSize),
+ /*offset=*/0);
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ });
+ if (ret < 0) { free(bounce); butil::return_object(ctx); return -1; }
+ ctx->bounce = bounce; // PollCq takes ownership and wraps it in IOBuf
+ return 0;
+}
+
+
+ssize_t IouringEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
+ CHECK(from != nullptr);
+ CHECK(ndata > 0);
+
+ if (!IsWritable()) { errno = EAGAIN; return -1; }
+
+ const int fd = _socket->fd();
+
+ // ------------------------------------------------------------------
+ // WRITE_FIXED path (--iouring_register_buffers=true)
+ // ------------------------------------------------------------------
+ // Every IOBuf block comes from IouringMemPool (a pre-registered slab).
+ // GetBufIndex() must always succeed; if it returns -1 a block somehow
+ // escaped the pool – LOG(ERROR) and fail rather than silently degrading.
+ //
+ // One IORING_OP_WRITE_FIXED SQE is submitted per block so the kernel can
+ // DMA directly from pinned pages with no per-op get_user_pages overhead.
+ // Called on the Poller thread; no locking needed.
+ // ------------------------------------------------------------------
+ if (IsFixedBuffersEnabled()) {
+ Poller* poller = GetPoller();
+ if (!poller) { errno = ENODEV; return -1; }
+ IouringMemPool& mp = IouringMemPool::Instance();
+ struct io_uring* ring = &poller->ring;
+
+ ssize_t total_bytes = 0;
+ int sqes_queued = 0;
+
+ for (size_t i = 0; i < ndata; ++i) {
+ if (!from[i] || from[i]->empty()) { continue; }
+ butil::IOBuf& buf = *from[i];
+ while (!buf.empty()) {
+ const void* seg_ptr = buf.fetch1();
+ size_t seg_len = buf.backing_block(0).size();
+ if (seg_len == 0) { break; }
+
+ int buf_idx = mp.GetBufIndex(ring, seg_ptr);
+ if (buf_idx < 0) {
+ // Every block must be registered when
+ // --iouring_register_buffers=true. A -1 here means a
+ // block escaped the pool – this is a programming error.
+ LOG(ERROR) << "io_uring: unregistered IOBuf block ptr="
+ << seg_ptr << " fd=" << fd
+ << "; submission aborted.";
+ errno = EINVAL;
+ break;
+ }
+
+ struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
+ if (!sqe) { errno = ENOBUFS; break; }
+
+ io_uring_prep_write_fixed(sqe, fd,
+ const_cast<void*>(seg_ptr),
+ static_cast<unsigned>(seg_len),
+ 0, buf_idx);
+ IouringReqContext* ctx =
butil::get_object<IouringReqContext>();
+ if (!ctx) { errno = ENOMEM; break; }
+ ctx->op = IOURING_OP_WRITE_FIXED;
+ ctx->fd = fd;
+ ctx->socket_id = _socket->id();
+ ctx->bounce = nullptr;
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ total_bytes += static_cast<ssize_t>(seg_len);
+ ++sqes_queued;
+ buf.pop_front(seg_len);
+ }
+ }
+
+ if (sqes_queued > 0) {
+ int ret = io_uring_submit(ring);
+ if (ret < 0) {
+ errno = -ret;
+ return total_bytes > 0 ? total_bytes : -1;
+ }
+ _inflight_writes.fetch_add(sqes_queued,
butil::memory_order_relaxed);
+ }
+ return total_bytes > 0 ? total_bytes : (sqes_queued == 0 ? 0 : -1);
+ }
+
+ // ------------------------------------------------------------------
+ // Plain WRITEV path (--iouring_register_buffers=false)
+ // ------------------------------------------------------------------
+ // Collect all iovec entries from the IOBuf list and submit a single
+ // IORING_OP_WRITEV. Cap at IOURING_IOV_MAX to stay within SQ limits.
+ // ------------------------------------------------------------------
+ static const size_t IOURING_IOV_MAX = 256;
+ std::vector<struct iovec> iov;
+ ssize_t total_bytes = 0;
+
+ for (size_t i = 0; i < ndata; ++i) {
+ if (!from[i] || from[i]->empty()) { continue; }
+ butil::IOBuf& buf = *from[i];
+ while (!buf.empty() && iov.size() < IOURING_IOV_MAX) {
+ const void* seg_ptr = buf.fetch1();
+ size_t seg_len = buf.backing_block(0).size();
+ if (seg_len == 0) { break; }
+ iov.push_back({const_cast<void*>(seg_ptr), seg_len});
+ total_bytes += static_cast<ssize_t>(seg_len);
+ buf.pop_front(seg_len);
+ }
+ if (iov.size() >= IOURING_IOV_MAX) { break; }
+ }
+
+ if (iov.empty()) { return 0; }
+
+ IouringReqContext* ctx = butil::get_object<IouringReqContext>();
+ if (!ctx) { errno = ENOMEM; return -1; }
+ ctx->op = IOURING_OP_WRITE;
+ ctx->fd = fd;
+ ctx->socket_id = _socket->id();
+ ctx->bounce = nullptr;
+
+ int ret = SubmitOneSqe([&](struct io_uring_sqe* sqe) {
+ io_uring_prep_writev(sqe, fd, iov.data(),
+ static_cast<unsigned>(iov.size()), 0);
+ sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+ });
+ if (ret < 0) { butil::return_object(ctx); return -1; }
+ _inflight_writes.fetch_add(1, butil::memory_order_relaxed);
+ return total_bytes;
+}
+
+bool IouringEndpoint::IsWritable() const {
+ return _inflight_writes.load(butil::memory_order_relaxed) <
MAX_INFLIGHT_WRITES;
+}
+
+// ---------------------------------------------------------------------------
+// PollCq – CQE completion handler
+//
+// READ_FIXED path (--iouring_register_buffers=true, zero-copy)
+// -------------------------------------------------------------
+// The kernel has written |res| bytes directly into ep->_read_slot.buf (a
+// pre-registered, pinned page). IOBuf::append_user_data() wraps those bytes
+// without copying; the deleter calls IouringMemPool::Deallocate(buf)
+// (thread-safe) when the last IOBuf reference is dropped.
+// A fresh block is allocated from IouringMemPool for the next READ_FIXED.
+//
+// READ path (--iouring_register_buffers=false)
+// --------------------------------------------
+// ctx->bounce points to a per-call malloc'd bounce buffer (64 KiB).
+// IOBuf takes ownership (free() destructor) when the data is wrapped.
+// ---------------------------------------------------------------------------
+
+void IouringEndpoint::PollCq(Socket* m) {
+ IouringEndpoint* ep = static_cast<IouringEndpoint*>(m->user());
+ if (!ep) { return; }
+
+ SocketUniquePtr s;
+ if (Socket::Address(ep->_socket->id(), &s) < 0) { return; }
+
+ // CQ-side operations need only the ring pointer; no lock required here
+ // because io_uring_peek_batch_cqe / io_uring_cqe_seen operate on the CQ
+ // which is only touched by the Poller thread.
+ Poller* poller = ep->GetPoller();
+ if (!poller) { return; }
+ struct io_uring* ring = &poller->ring;
+
+ struct io_uring_cqe* cqes[FLAGS_iouring_max_cqe_poll_once];
+ InputMessageClosure last_msg;
+ int progress = Socket::PROGRESS_INIT;
+
+ while (true) {
+ const int cnt = io_uring_peek_batch_cqe(
+ ring, cqes,
static_cast<unsigned>(FLAGS_iouring_max_cqe_poll_once));
+
+ if (cnt <= 0) {
+ if (s->Failed()) { return; }
+ if (!m->MoreReadEvents(&progress)) { break; }
+ continue;
+ }
+
+ ssize_t bytes_read = 0;
+
+ for (int i = 0; i < cnt; ++i) {
+ struct io_uring_cqe* cqe = cqes[i];
+ const uint64_t udata = cqe->user_data;
+
+ // Only process CQEs that bRPC submitted (bit 63 == 1).
+ // All other CQEs are user-submitted; leave them in the ring so the
+ // user callback (called right after PollCq returns) can
+ // drain and handle them. Users need no special tagging – bit 63
+ // is never set in a canonical user-space pointer or small integer.
+ if (!(udata & kBrpcCqeTag)) {
+ continue; // do NOT call io_uring_cqe_seen()
+ }
+
+ // Strip the tag bit to recover the original IouringReqContext*.
+ // udata == kBrpcCqeTag (NOP wake-up SQE) → ctx will be nullptr.
+ IouringReqContext* ctx =
+ reinterpret_cast<IouringReqContext*>(
+ static_cast<uintptr_t>(udata & ~kBrpcCqeTag));
+
+ if (!ctx) {
+ io_uring_cqe_seen(ring, cqe);
+ continue;
+ }
+
+ const int res = cqe->res;
+ io_uring_cqe_seen(ring, cqe);
+
+ // ---------------------------------------------------------------
+ // Error handling
+ // ---------------------------------------------------------------
+ if (res < 0) {
+ if (res == -ECANCELED) {
+ // Op was cancelled.
+ // READ_FIXED: _read_slot still holds the block that was
+ // submitted for this CQE. DeallocateResources will pass
+ // it in the REMOVE SidOp so the Poller thread can call
+ // IouringMemPool::Deallocate(buf).
+ // IOURING_OP_READ: free the malloc'd bounce buffer.
+ if (ctx->op == IOURING_OP_READ) { free(ctx->bounce); }
+ butil::return_object(ctx);
+ continue;
+ }
+
+ const int saved_errno = -res;
+ LOG(WARNING) << "io_uring CQE error fd=" << ctx->fd
+ << " socket_id=" << ctx->socket_id
+ << " op=" << (int)ctx->op
+ << ": " << berror(saved_errno);
+ SocketUniquePtr cs;
+ if (Socket::Address(ctx->socket_id, &cs) == 0) {
+ cs->SetFailed(saved_errno, "io_uring op error: %s",
+ berror(saved_errno));
+ }
+ if (ctx->op == IOURING_OP_READ) { free(ctx->bounce); }
+ butil::return_object(ctx);
+ continue;
+ }
+
+ // ---------------------------------------------------------------
+ // Dispatch by operation type
+ // ---------------------------------------------------------------
+ if (ctx->op == IOURING_OP_READ || ctx->op ==
IOURING_OP_READ_FIXED) {
+ if (res == 0) {
+ // EOF
+ SocketUniquePtr cs;
+ if (Socket::Address(ctx->socket_id, &cs) == 0) {
+ cs->SetEOF();
+ }
+ if (ctx->op == IOURING_OP_READ) { free(ctx->bounce); }
+ butil::return_object(ctx);
+ continue;
+ }
+
+ if (ctx->op == IOURING_OP_READ_FIXED) {
+ //
---------------------------------------------------------
+ // Registered mode – zero-copy.
+ //
+ // The kernel has written |res| bytes into the block at
+ // ep->_read_slot.buf (a page from IouringMemPool that is
+ // permanently pinned for this ring).
+ //
+ // Rotation:
+ // 1. Steal the current slot from the endpoint.
+ // 2. Submit the next READ_FIXED immediately (SubmitRead
+ // allocates a new block from IouringMemPool).
+ // 3. Wrap the data zero-copy in IOBuf; the deleter
+ // calls IouringMemPool::Deallocate(buf) on any thread
+ // when the last reference drops (thread-safe).
+ //
+ // No separate slot pool is needed: every IouringMemPool
+ // block already has a permanent buf_index, and
+ // Allocate/Deallocate are its own reference count.
+ //
---------------------------------------------------------
+ IouringReadSlot consumed_slot = ep->_read_slot;
+ ep->_read_slot = {};
+
+ // Submit the next read before handing off the consumed
+ // slot so back-to-back arrivals never stall.
+ ep->SubmitRead(ctx->fd);
+
+ // Zero-copy: wrap the consumed buffer in IOBuf.
+ // The lambda captures consumed_slot by value; it is
+ // self-contained and can run on any thread.
+ void* const buf = consumed_slot.buf;
+ butil::IOBuf tmp;
+ tmp.append_user_data(
+ buf,
+ static_cast<size_t>(res),
+ [buf](void* /*ptr*/) {
+ IouringMemPool::Instance().Deallocate(buf);
+ });
+ m->_read_buf.append(std::move(tmp));
+ } else {
+ //
---------------------------------------------------------
+ // Unregistered mode – bounce buffer.
+ //
+ // The bounce buffer was malloc'd in SubmitRead and stored
in
+ // ctx->bounce. Transfer ownership to IOBuf (free() is
+ // called when IOBuf discards the block).
+ //
---------------------------------------------------------
+ butil::IOBuf tmp;
+ tmp.append_user_data(ctx->bounce,
+ static_cast<size_t>(res),
+ free);
+ ctx->bounce = nullptr; // ownership transferred
+ m->_read_buf.append(std::move(tmp));
+
+ ep->SubmitRead(ctx->fd);
+ }
+
+ bytes_read += res;
+
+ } else {
+ // WRITE / WRITE_FIXED completion
+ IouringEndpoint* dep = ep;
+ if (ctx->socket_id != ep->_socket->id()) {
+ SocketUniquePtr cs;
+ if (Socket::Address(ctx->socket_id, &cs) == 0) {
+ dep = static_cast<IouringEndpoint*>(cs->user());
+ }
+ }
+ if (dep) {
+ dep->_inflight_writes.fetch_sub(1,
butil::memory_order_relaxed);
+ dep->_socket->WakeAsEpollOut();
+ }
+ }
+
+ butil::return_object(ctx);
+ } // for each cqe
+
+ if (bytes_read > 0) {
+ const int64_t received_us = butil::cpuwide_time_us();
+ const int64_t base_realtime = butil::gettimeofday_us() -
received_us;
+ InputMessenger* messenger =
static_cast<InputMessenger*>(s->user());
+ if (messenger && messenger->ProcessNewMessage(
+ s.get(), bytes_read, false,
+ received_us, base_realtime, last_msg) < 0) {
+ return;
+ }
+ }
+ }
+}
+
+// ---------------------------------------------------------------------------
+// DebugInfo
+// ---------------------------------------------------------------------------
+
+void IouringEndpoint::DebugInfo(std::ostream& os,
+ butil::StringPiece connector) const {
+ os << "iouring_polling_mode=" << FLAGS_iouring_polling_mode
+ << connector
+ << "iouring_inflight_writes="
+ << _inflight_writes.load(butil::memory_order_relaxed)
+ << connector << "iouring_writable=" << IsWritable()
+ << connector << "iouring_register_buffers=" << IsFixedBuffersEnabled();
+ if (IsFixedBuffersEnabled() && _read_slot.buf != nullptr) {
+ os << " buf_index=" << _read_slot.buf_index
+ << " slot_size=" << _read_slot.size;
+ }
+}
+
+// ---------------------------------------------------------------------------
+// GlobalInitialize / GlobalRelease
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::GlobalInitialize() {
+ _poller_groups = std::vector<PollerGroup>(FLAGS_task_group_ntags);
+ return 0;
+}
+
+void IouringEndpoint::GlobalRelease() {
+ for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
+ PollingModeRelease(i);
+ }
+}
+
+// ---------------------------------------------------------------------------
+// PollerDrainOpQueue
+//
+// Dequeues all pending SidOps from poller->op_queue and applies them:
+// ADD – track the socket, allocate a block from IouringMemPool (fixed
+// mode) via SubmitRead, issue the first read.
+// REMOVE – return any in-flight read buffer to IouringMemPool, stop
+// tracking the socket.
+//
+// Must run exclusively on the Poller thread.
+// ---------------------------------------------------------------------------
+
+void IouringEndpoint::PollerDrainOpQueue(
+ Poller* poller,
+ std::unordered_set<SocketId>& tracked_sids) {
+ SidOp op;
+ while (poller->op_queue.Dequeue(op)) {
+ if (op.type == SidOp::ADD) {
+ tracked_sids.emplace(op.sid);
+ SocketUniquePtr s_add;
+ if (Socket::Address(op.sid, &s_add) == 0) {
+ IouringEndpoint* ep =
+ static_cast<IouringEndpoint*>(s_add->user());
+ if (ep) {
+ // Issue the first SubmitRead on the Poller thread.
+ // In fixed-buffer mode SubmitRead allocates a block from
+ // IouringMemPool; no separate slot pool is needed.
+ if (ep->SubmitRead(s_add->fd()) < 0) {
+ LOG(WARNING)
+ << "IouringEndpoint: first SubmitRead "
+ "failed for socket "
+ << op.sid << ": " << berror();
+ }
+ }
+ }
+ } else {
+ // REMOVE: if a READ_FIXED was in flight (buf != null), the kernel
+ // has not yet written into it. Return the block to IouringMemPool
+ // now; no CQE will arrive for a cancelled READ_FIXED (it arrives
+ // as -ECANCELED and is freed in PollCq's error path instead).
+ // For safety we Deallocate here only when _read_slot is still
+ // owned by the endpoint (i.e. SubmitRead set it but PollCq has
+ // not yet consumed it).
+ if (IsFixedBuffersEnabled() && op.read_slot.buf != nullptr) {
+ IouringMemPool::Instance().Deallocate(op.read_slot.buf);
+ }
+ tracked_sids.erase(op.sid);
+ }
+ }
+}
+
+// ---------------------------------------------------------------------------
+// PollingModeInitialize
+// ---------------------------------------------------------------------------
+
+std::vector<IouringEndpoint::PollerGroup> IouringEndpoint::_poller_groups;
+
+int IouringEndpoint::PollingModeInitialize(
+ bthread_tag_t tag,
+ std::function<void(IouringPollerHandle)> callback,
+ std::function<void(IouringPollerHandle)> init_fn,
+ std::function<void(IouringPollerHandle)> release_fn) {
+
+ if (tag < 0 || tag >= static_cast<int>(_poller_groups.size())) {
+ LOG(ERROR) << "io_uring: invalid bthread tag " << tag;
+ return -1;
+ }
+
+ auto& group = _poller_groups[tag];
+ auto& pollers = group.pollers;
+ auto& running = group.running;
+
+ bool expected = false;
+ if (!running.compare_exchange_strong(expected, true)) { return 0; }
+
+ // -----------------------------------------------------------------------
+ // Poller thread arguments
+ // -----------------------------------------------------------------------
+ struct FnArgs {
+ Poller* poller;
+ std::atomic<bool>* running;
+ bthread_tag_t tag;
+ int index; // poller index within the group
+ };
+
+ // -----------------------------------------------------------------------
+ // Poller thread body
+ // -----------------------------------------------------------------------
+ auto fn = [](void* p) -> void* {
+ std::unique_ptr<FnArgs> args(static_cast<FnArgs*>(p));
+ Poller* poller = args->poller;
+ std::atomic<bool>* running = args->running;
+
+ // 1. Create the ring.
+ struct io_uring_params params = BuildRingParams();
+ const unsigned sq_size = static_cast<unsigned>(GetIouringSqSize());
+
+ int ret = io_uring_queue_init_params(sq_size, &poller->ring, ¶ms);
+ if (ret < 0) {
+ LOG(ERROR) << "io_uring_queue_init_params failed: " <<
berror(-ret);
+ running->store(false, std::memory_order_relaxed);
+ return nullptr;
+ }
+ poller->ring_initialized = true;
+
+ // 2. Initialise the fixed-buffer infrastructure.
+ if (IsFixedBuffersEnabled()) {
+ // 2a. Register this ring with IouringMemPool.
+ // The callback is called for each existing region immediately
+ // (to bring the ring up to date) and for each future region
+ // (when the pool grows). It issues register_buffers_update /
+ // full re-registration to pin the new pages in this ring.
+ IouringMemPool::Instance().AddRingRegistrar(
+ &poller->ring,
+ [poller](void* base, size_t size, size_t block_size,
+ int buf_index_base) {
+ // Build one iovec per block in this new region.
+ const int n = static_cast<int>(size / block_size);
+ std::vector<struct iovec> iovs(n);
+ for (int i = 0; i < n; ++i) {
+ iovs[i].iov_base =
+ static_cast<char*>(base) + i * block_size;
+ iovs[i].iov_len = block_size;
+ }
+ // Register the new region's buffers.
+ // io_uring_register_buffers_update (kernel >= 5.13) allows
+ // incremental updates; fall back to a full re-registration
+ // on older kernels or if the function is unavailable.
+ int ret = -ENOSYS;
+#ifdef IORING_REGISTER_BUFFERS_UPDATE
+ ret = io_uring_register_buffers_update(
+ &poller->ring,
+ static_cast<unsigned>(buf_index_base),
+ iovs.data(),
+ static_cast<unsigned>(n));
+#endif
+ if (ret < 0) {
+ // Full re-registration path.
+ // For the first region just call register_buffers.
+ // For subsequent regions we must rebuild the complete
+ // table; here we only have the new slice so we
+ // attempt a best-effort register and log on failure.
+ if (buf_index_base > 0) {
+ // Unregister previous table before re-registering.
+ io_uring_unregister_buffers(&poller->ring);
+ }
+ int r2 = io_uring_register_buffers(&poller->ring,
Review Comment:
The fallback path when `io_uring_register_buffers_update()` is unavailable
unregisters the existing buffer table and then re-registers **only the new
region's** iovecs. This effectively drops registration for all
previously-registered regions while `GetBufIndex()` still returns indices
spanning all regions, leading to EINVAL or incorrect I/O once the pool grows on
kernels < 5.13. The fallback needs to rebuild and register the full iovec table
for *all* regions (or disable growth / fail fast on growth when updates are
unsupported).
##########
src/brpc/iouring/iouring_block_pool.cpp:
##########
@@ -0,0 +1,566 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_IOURING
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/uio.h>
+
+#include <gflags/gflags.h>
+#include <liburing.h>
+
+#include "butil/errno.h" // berror()
+#include "butil/fast_rand.h" // butil::fast_rand()
+#include "butil/logging.h"
+#include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
+#include "butil/iobuf.h" // butil::iobuf::blockmem_allocate
+#include "bvar/bvar.h" // bvar::Adder, bvar::PerSecond,
bvar::PassiveStatus
+#include "brpc/iouring/iouring_block_pool.h"
+
+// iobuf internal hooks – declared in iobuf.cpp / iobuf_inl.h
+namespace butil {
+namespace iobuf {
+extern void* (*blockmem_allocate)(size_t);
+extern void (*blockmem_deallocate)(void*);
+}
+}
+
+namespace brpc {
+namespace iouring {
+
+// ---------------------------------------------------------------------------
+// bvar metrics for IouringMemPool allocation paths
+// ---------------------------------------------------------------------------
+//
+// iouring_mem_pool_tls_alloc_count – cumulative TLS-hit allocations
+// iouring_mem_pool_global_alloc_count – cumulative global-bucket-hit
allocations
+// (TLS miss, global bucket had blocks)
+// iouring_mem_pool_pool_grow_count – cumulative pool-grow events
+// (TLS miss + all buckets empty →
AddRegion)
+// iouring_mem_pool_tls_alloc_second – TLS-hit allocations per second
+// iouring_mem_pool_global_alloc_second– global-bucket allocations per second
+// iouring_mem_pool_tls_hit_rate – ratio tls_alloc / (tls_alloc +
global_alloc)
+// over the lifetime of the process.
+//
+// During normal operation tls_hit_rate should be close to 1.0 and
+// global_alloc_second should be near 0. A sustained non-zero
+// global_alloc_second or a hit-rate below ~0.99 indicates the TLS cache
+// is too small (--iouring_mem_pool_tls_cache_max) or the pool needs more
+// initial memory (--iouring_mem_pool_initial_mb).
+
+static bvar::Adder<int64_t> g_tls_alloc_count;
+static bvar::Adder<int64_t> g_global_alloc_count;
+static bvar::Adder<int64_t> g_pool_grow_count;
+
+static bvar::PerSecond<bvar::Adder<int64_t>> g_tls_alloc_per_second(
+ "iouring_mem_pool_tls_alloc_second", &g_tls_alloc_count);
+static bvar::PerSecond<bvar::Adder<int64_t>> g_global_alloc_per_second(
+ "iouring_mem_pool_global_alloc_second", &g_global_alloc_count);
+
+// Named Adder bvars so the raw counters are also accessible by name.
+static bvar::Window<bvar::Adder<int64_t>> g_tls_alloc_window(
+ "iouring_mem_pool_tls_alloc_count", &g_tls_alloc_count, 0);
+static bvar::Window<bvar::Adder<int64_t>> g_global_alloc_window(
+ "iouring_mem_pool_global_alloc_count", &g_global_alloc_count, 0);
+static bvar::Window<bvar::Adder<int64_t>> g_pool_grow_window(
+ "iouring_mem_pool_pool_grow_count", &g_pool_grow_count, 0);
+
+// PassiveStatus that computes the lifetime TLS hit-rate on demand.
+static double GetTlsHitRate(void*) {
+ const int64_t tls = g_tls_alloc_count.get_value();
+ const int64_t global = g_global_alloc_count.get_value();
+ const int64_t total = tls + global;
+ return total > 0 ? static_cast<double>(tls) / static_cast<double>(total)
+ : 1.0;
+}
+static bvar::PassiveStatus<double> g_tls_hit_rate(
+ "iouring_mem_pool_tls_hit_rate", GetTlsHitRate, nullptr);
+
+// ---------------------------------------------------------------------------
+// gflags
+// ---------------------------------------------------------------------------
+
+DEFINE_bool(iouring_register_buffers, false,
+ "Enable io_uring pre-registered buffer I/O (READ_FIXED / "
+ "WRITE_FIXED). When true, all IOBuf blocks are allocated from a "
+ "registered slab so that writes can use IORING_OP_WRITE_FIXED "
+ "with no per-op page pinning, and reads use IORING_OP_READ_FIXED. "
+ "Requires kernel >= 5.1.");
+
+DEFINE_int32(iouring_mem_pool_initial_mb, 256,
+ "Initial size of the io_uring fixed-buffer memory pool (MB). "
+ "Effective only with --iouring_register_buffers=true.");
+
+DEFINE_int32(iouring_mem_pool_increase_mb, 256,
+ "Growth increment when the pool is exhausted (MB). "
+ "Effective only with --iouring_register_buffers=true.");
+
+DEFINE_int32(iouring_mem_pool_max_regions, 8,
+ "Maximum number of memory regions. "
+ "Each region causes one io_uring_register_buffers_update() per "
+ "ring on growth.");
+
+DEFINE_int32(iouring_iobuf_block_size, 8192,
+ "Size of each IOBuf block when --iouring_register_buffers=true. "
+ "butil::SetDefaultBlockSize() is called with this value at "
+ "startup so IOBuf and the registered slab are always in sync.");
+
+DEFINE_int32(iouring_mem_pool_free_buckets, 8,
+ "Number of independent global free-list buckets in the io_uring "
+ "fixed-buffer memory pool. More buckets reduce mutex contention "
+ "when many threads flush/refill their TLS caches concurrently. "
+ "Must be in [1, 64]. Takes effect only at pool initialisation "
+ "(--iouring_register_buffers=true).");
+
+DEFINE_int32(iouring_mem_pool_tls_cache_num, 128,
+ "Maximum number of blocks cached per thread in the TLS free-list "
+ "of the io_uring fixed-buffer memory pool. Larger values reduce "
+ "trips to the global bucket at the cost of per-thread memory "
+ "overhead (each block is iouring_iobuf_block_size bytes). "
+ "Must be in [1, 4096]. Takes effect only at pool initialisation "
+ "(--iouring_register_buffers=true).");
+
+// ---------------------------------------------------------------------------
+bool IsFixedBuffersEnabled() { return FLAGS_iouring_register_buffers; }
+
+// ---------------------------------------------------------------------------
+// IouringMemPool – implementation
+// ---------------------------------------------------------------------------
+
+__thread IouringMemPool::FreeNode* IouringMemPool::tls_free_ = nullptr;
+__thread size_t IouringMemPool::tls_free_cnt_ = 0;
+
+static const size_t kBytesPerMB = 1UL << 20;
+
+// Validated bounds for the two new gflags.
+static const int kMinFreeBuckets = 1;
+static const int kMaxFreeBuckets = 64;
+static const int kMinTlsCacheNum = 1;
+static const int kMaxTlsCacheNum = 4096;
+
+IouringMemPool& IouringMemPool::Instance() {
+ static IouringMemPool inst;
+ return inst;
+}
+
+// ---------------------------------------------------------------------------
+// Init
+// ---------------------------------------------------------------------------
+bool IouringMemPool::Init(size_t block_size) {
+ if (initialized_) {
+ LOG(WARNING) << "IouringMemPool already initialized";
+ return true;
+ }
+ if (block_size == 0 || block_size % 4096 != 0) {
+ LOG(ERROR) << "IouringMemPool::Init: block_size must be a nonzero "
+ "multiple of 4096, got " << block_size;
+ return false;
+ }
+
+ // Validate and apply free_buckets gflag.
+ int nbuckets = FLAGS_iouring_mem_pool_free_buckets;
+ if (nbuckets < kMinFreeBuckets || nbuckets > kMaxFreeBuckets) {
+ LOG(WARNING) << "iouring_mem_pool_free_buckets (" << nbuckets
+ << ") out of [" << kMinFreeBuckets << ","
+ << kMaxFreeBuckets << "], clamped to 8";
+ nbuckets = 8;
+ }
+ num_free_buckets_ = nbuckets;
+ free_buckets_.reset(new (std::nothrow) FreeBucket[num_free_buckets_]);
+ if (!free_buckets_) {
+ LOG(ERROR) << "IouringMemPool::Init: failed to allocate bucket array";
+ return false;
+ }
+
+ // Validate and apply tls_cache_num gflag.
+ int tls_cache = FLAGS_iouring_mem_pool_tls_cache_num;
+ if (tls_cache < kMinTlsCacheNum || tls_cache > kMaxTlsCacheNum) {
+ LOG(WARNING) << "iouring_mem_pool_tls_cache_num (" << tls_cache
+ << ") out of [" << kMinTlsCacheNum << ","
+ << kMaxTlsCacheNum << "], clamped to 128";
+ tls_cache = 128;
+ }
+ tls_cache_max_ = static_cast<size_t>(tls_cache);
+
+ block_size_ = block_size;
+
+ // Hook IOBuf's block allocator.
+ prev_allocate_ = butil::iobuf::blockmem_allocate;
+ prev_deallocate_ = butil::iobuf::blockmem_deallocate;
+ butil::iobuf::blockmem_allocate = MemPoolAllocate;
+ butil::iobuf::blockmem_deallocate = MemPoolDeallocate;
+
+ // Allocate the initial region.
+ if (!AddRegion(static_cast<size_t>(FLAGS_iouring_mem_pool_initial_mb))) {
+ // Unhook on failure.
+ butil::iobuf::blockmem_allocate = prev_allocate_;
+ butil::iobuf::blockmem_deallocate = prev_deallocate_;
+ return false;
+ }
Review Comment:
`IouringMemPool::AddRegion()` is documented as requiring `extend_lock_` to
be held, but `Init()` calls `AddRegion()` without taking that lock. Even if
startup is typically single-threaded, this violates the function's own contract
and makes future concurrency bugs more likely (e.g. if init ever happens
concurrently with `AddRingRegistrar()`/growth).
##########
docs/cn/iouring.md:
##########
@@ -0,0 +1,329 @@
+# io_uring 支持
+
+## 编译
+
+io_uring 依赖内核 5.1+(推荐 5.10+),仅支持 Linux 系统。需要先安装或编译
[liburing](https://github.com/axboe/liburing)。
+
+### 准备 liburing
+
+```bash
+# 方式一:发行版包管理(推荐)
+sudo apt install liburing-dev # Debian / Ubuntu
+sudo yum install liburing-devel # CentOS / RHEL
+
+# 方式二:源码编译
+git clone https://github.com/axboe/liburing.git /path/to/liburing
+cd /path/to/liburing && make
+```
+
+### 使用 CMake
+
+```bash
+mkdir bld && cd bld
+cmake -DWITH_IOURING=ON \
+ -DIOURING_INCLUDE_PATH=/path/to/liburing/src/include \
+ -DIOURING_LIB=/path/to/liburing/src/liburing.a ..
+make
+```
+
+> **注意**:CMake 变量名为 `IOURING_INCLUDE_PATH` 和 `IOURING_LIB`,不是
`LIBURING_INCLUDE_PATH` / `LIBURING_LIBRARY`,拼写错误会触发 CMake Warning 且实际不生效。
+
+如果 liburing 已通过包管理安装,省略后两个 `-D` 参数即可:
+
+```bash
+cmake -DWITH_IOURING=ON .. && make
+```
+
+如果 liburing 安装在非标准路径,也可以用 `CMAKE_PREFIX_PATH` 让 CMake 自动发现:
+
+```bash
+cmake -DWITH_IOURING=ON -DCMAKE_PREFIX_PATH=/path/to/liburing/install .. &&
make
+```
+
+#### 编译示例程序
+
+```bash
+cd example/iouring_echo_c++
+mkdir bld && cd bld
+cmake -DBRPC_WITH_IOURING=ON ..
+make
+```
+
+### 使用 Bazel
+
+Bazel 构建通过 `--define=BRPC_WITH_IOURING=true` 开关启用 io_uring 支持。liburing 通过
`http_archive` 从 GitHub 自动下载(当前锁定到 liburing-2.14),无需手动准备源码。
+
+#### 编译 brpc 库
+
+```bash
+bazel build //:brpc --define=BRPC_WITH_IOURING=true
+```
+
+#### 编译示例程序
+
+```bash
+bazel build //example:iouring_echo_server //example:iouring_echo_client
--define=BRPC_WITH_IOURING=true
+```
+
+---
+
+## 基本原理
+
+io_uring 通过内核与用户态共享的提交队列(SQ)和完成队列(CQ)实现异步 I/O,避免了每次 I/O 的系统调用开销。brpc 的实现复用了原有的
`Socket` 类,每个 Socket 持有一个
`IouringEndpoint`(`src/brpc/iouring/iouring_endpoint.cpp`)。
+
+- **写路径**:`CutFromIOBufList` 将 `IOBuf` 中的数据段提交为 `IORING_OP_WRITEV`(或注册模式下的
`IORING_OP_WRITE_FIXED`)SQE。
+- **读路径**:`SubmitRead` 提交 `IORING_OP_READ`(或 `IORING_OP_READ_FIXED`)SQE,Poller
线程通过 `PollCq` 收割 CQE,复用 `InputMessenger` 完成消息解析。
+
+### one-thread-per-ring 架构
+
+每个 `bthread_tag` 拥有一个 `PollerGroup`,包含**恰好一个** Poller bthread,持有一个独立的
`io_uring` 实例(ring)。
+
+**所有 SQ 操作(`SubmitRead`、`CutFromIOBufList`)都在 Poller bthread
上执行**,不需要任何锁。新连接通过 MPSC 队列(`op_queue`)发送 ADD/REMOVE 消息给 Poller,由 Poller 在主循环中处理。
+
+> **为什么每个 tag 只能有一个 Poller?**
+> io_uring 的 SQ 是单生产者设计,不支持并发写入。bthread 采用 work-stealing 调度,同一 `bthread_tag`
内的 bthread 会在多个 pthread 上运行。若同一 tag 存在多个 Poller,两个 Poller bthread 可能被 steal 到不同
pthread 并发操作各自 ring 的 SQ,同时业务 bthread 也可能在另一个 pthread 上提交 SQE,产生竞争。因此**一个
bthread_tag 只能有一个 Poller**,水平扩展应通过增加 bthread_tag 数量(`--task_group_ntags`)实现,每个
tag 独占一个 Poller 和一个 ring。
+
+### CQE 收割策略
+
+通过 `--iouring_polling_mode` 选择 Poller 线程的收割策略:
+
+- **none**(默认):中断驱动模式,Poller 调用 `io_uring_wait_cqe_timeout`(超时 1 ms)阻塞等待内核通知。1
ms 超时的目的是保持 Poller 循环对 `op_queue` 中新连接的响应性,同时在无 I/O 时避免 CPU 空转。适合通用场景。
+- **sqpoll**:启用 `IORING_SETUP_SQPOLL`,内核线程持续轮询 SQ,无需 `io_uring_submit`
系统调用,延迟最低,需要 `CAP_SYS_NICE`。
+- **iopoll**:`IORING_SETUP_IOPOLL`,仅对 O_DIRECT 块设备有效。
+- **hybrid**:先忙转 N 次(`--iouring_hybrid_spin_count`),无 CQE 后再阻塞,兼顾延迟和 CPU 利用率。
+
+---
+
+## 启动与初始化
+
+在调用 `brpc::Server::Start()` 之前完成 io_uring 初始化:
+
+```cpp
+#include <brpc/iouring/iouring_helper.h>
+
+// 1. 全局初始化:探测内核能力,初始化内存池(若启用了 --iouring_register_buffers)
+brpc::iouring::GlobalIouringInitializeOrDie();
+
+// 2. 为指定 bthread tag 创建 ring 并启动 Poller bthreads
+// tag=0 为默认 tag,覆盖所有普通 bthread
+if (!brpc::iouring::InitPollingModeWithTag(/*tag=*/0)) {
+ LOG(FATAL) << "Failed to init io_uring";
+}
+```
+
+`InitPollingModeWithTag` 支持三个可选回调,均在 Poller 线程上调用:
+
+```cpp
+brpc::iouring::InitPollingModeWithTag(
+ /*tag=*/0,
+ /*callback=*/[](brpc::iouring::IouringPollerHandle h) {
+ // 每次 PollCq 之后调用,用于提交用户自定义 SQE 或收割用户 CQE。
+ // 注意:bRPC 的 PollCq 会跳过 user_data bit63=0 的 CQE 且不调
+ // io_uring_cqe_seen(),用户必须在此处手动 drain,否则 CQ 会满。
+ h.Submit([](::io_uring* r) -> int {
+ // drain 用户 CQE
+ struct io_uring_cqe* cqe = nullptr;
+ while (io_uring_peek_cqe(r, &cqe) == 0) {
+ if (cqe->user_data & brpc::iouring::kBrpcCqeTag) break;
+ // 处理 cqe->res …
+ io_uring_cqe_seen(r, cqe);
+ }
+ // 提交用户 SQE
+ ::io_uring_sqe* sqe = io_uring_get_sqe(r);
+ if (!sqe) return 0;
+ io_uring_prep_nop(sqe);
+ sqe->user_data = my_token; // bit63 必须为 0
+ return 1;
+ });
+ },
+ /*init_fn=*/nullptr, // ring 创建后调用一次
+ /*release_fn=*/nullptr // ring 销毁前调用一次
+);
+```
+
+完整示例见 `example/iouring_echo_c++/server.cpp`。
+
+---
+
+## 参数说明
+
+所有参数均通过 gflags 在命令行传入。
+
+### Ring 大小
+
+| 参数 | 默认值 | 说明 |
+|------|--------|------|
+| `iouring_sq_size` | `256` | 每个 ring 的 SQ 深度(并发 in-flight SQE 上限) |
+| `iouring_cq_size` | `0` | 每个 ring 的 CQ 深度(0 表示 2 × sq_size) |
+
+### Poller 线程
+
+| 参数 | 默认值 | 说明 |
+|------|--------|------|
+| `iouring_poller_yield` | `false` | 每轮 poll 后 yield,降低 CPU 占用(会增加尾延迟) |
+| `iouring_max_cqe_poll_once` | `32` | 每次 `io_uring_peek_batch_cqe` 最多收割的 CQE
数 |
+
+### 轮询模式
+
+| 参数 | 默认值 | 说明 |
+|------|--------|------|
+| `iouring_polling_mode` | `none` | CQE 收割策略,见下表 |
+| `iouring_sqpoll_idle_ms` | `2000` | SQPOLL 内核线程空闲超时(ms) |
+| `iouring_sqpoll_cpu` | `-1` | SQPOLL 内核线程绑定的 CPU(-1 = 不绑定) |
+| `iouring_hybrid_spin_count` | `1000` | hybrid 模式下忙转的迭代次数后再阻塞 |
+
+`iouring_polling_mode` 可选值:
+
+| 值 | 说明 |
+|----|------|
+| `none` | 中断驱动(默认):Poller 调用 `io_uring_wait_cqe_timeout`(超时 1 ms)阻塞等待内核通知,1
ms 超时保持对新连接的响应性 |
+| `sqpoll` | 内核 SQ 轮询线程(`IORING_SETUP_SQPOLL`),Poller 用 peek 收割 CQE,最低延迟,需要
`CAP_SYS_NICE` |
+| `iopoll` | 块设备完成轮询(`IORING_SETUP_IOPOLL`),内核不产生中断,Poller 每次 submit 后主动
peek,仅对 O_DIRECT 块设备有效 |
+| `hybrid` | 先忙转 N 次再阻塞,兼顾延迟和 CPU 利用率 |
+
+### 注册内存(零拷贝)
+
+| 参数 | 默认值 | 说明 |
+|------|--------|------|
+| `iouring_register_buffers` | `false` | 启用 `IORING_OP_READ_FIXED` /
`IORING_OP_WRITE_FIXED` 零拷贝模式 |
+| `iouring_mem_pool_initial_mb` | `256` | 初始注册内存大小(MiB) |
+| `iouring_mem_pool_increase_mb` | `256` | 内存池扩容步长(MiB) |
+| `iouring_mem_pool_max_regions` | `8` | 最大扩容次数 |
+| `iouring_iobuf_block_size` | `8192` | IOBuf block 和 read slot
的大小(字节),须与内存池对齐 |
+| `iouring_read_slot_num` | `256` | 每个 ring 初始 read slot 数量 |
+| `iouring_read_slot_max` | `4096` | 每个 ring 最大 read slot 数量 |
Review Comment:
The doc lists `iouring_read_slot_num` / `iouring_read_slot_max` gflags, but
these flags are not defined anywhere in the codebase (the implementation uses
`IouringMemPool` blocks rather than a read-slot pool). This makes the
documentation misleading for users trying to tune the transport.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]