ivanallen commented on code in PR #3328:
URL: https://github.com/apache/brpc/pull/3328#discussion_r3355218734


##########
src/brpc/iouring/iouring_endpoint.cpp:
##########
@@ -0,0 +1,993 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_IOURING
+
+#include <errno.h>
+#include <liburing.h>
+#include <sys/uio.h>
+#include <unordered_set>
+#include <string.h>
+
+#include <gflags/gflags.h>
+#include "butil/atomicops.h"
+#include "butil/fd_utility.h"
+#include "butil/logging.h"
+#include "butil/macros.h"
+#include "butil/third_party/murmurhash3/murmurhash3.h"
+#include "bthread/bthread.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/iouring/iouring_block_pool.h"
+#include "brpc/iouring/iouring_helper.h"
+#include "brpc/iouring/iouring_endpoint.h"
+
+DECLARE_int32(task_group_ntags);
+
+namespace brpc {
+namespace iouring {
+
+// ---------------------------------------------------------------------------
+// gflags (endpoint-level tunables)
+// ---------------------------------------------------------------------------
+
+// Each bthread_tag has exactly one Poller bthread (and one io_uring ring).
+// io_uring's SQ is single-producer; bthread work-stealing means multiple
+// Pollers per tag could run on different pthreads and race on the SQ.
+// Horizontal scaling is achieved by increasing --task_group_ntags instead.
+
+DEFINE_bool(iouring_poller_yield, false,
+            "Yield (bthread_yield / sched_yield) after each poll iteration. "
+            "Reduces CPU usage at the cost of higher tail latency.");
+
+DEFINE_int32(iouring_max_cqe_poll_once, 32,
+             "Maximum CQEs reaped per io_uring_peek_batch_cqe() call.");
+
+static const int32_t MAX_INFLIGHT_WRITES = 64;
+
+// ---------------------------------------------------------------------------
+// Constructor / Destructor
+// ---------------------------------------------------------------------------
+
+IouringEndpoint::IouringEndpoint(Socket* s)
+    : _socket(s)
+    , _inflight_writes(0)
+{
+    _read_slot = {};
+}
+
+IouringEndpoint::~IouringEndpoint() {
+    Reset();
+}
+
+void IouringEndpoint::Reset() {
+    DeallocateResources();
+    _inflight_writes.store(0, butil::memory_order_relaxed);
+}
+
+// ---------------------------------------------------------------------------
+// Resource management
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::AllocateResources() {
+    // Register this socket with the Poller via the MPSC op_queue.
+    // The Poller thread will dequeue the ADD message on its next iteration
+    // and issue the first SubmitRead there – on the Poller thread, without
+    // any locking.
+    //
+    // Registered-buffer mode: acquire a fixed read slot on the Poller thread
+    // when processing the ADD message (see main loop).  Nothing to do here.
+    PollerAddSid();
+    return 0;
+}
+
+void IouringEndpoint::DeallocateResources() {
+    // Embed the read slot (if any) in the REMOVE message so the Poller thread
+    // can Release it without any locking – all slot_pool operations happen on
+    // the Poller thread.
+    PollerRemoveSid(_read_slot);
+    _read_slot = {};
+}
+
+// ---------------------------------------------------------------------------
+// Ring / Poller access
+// ---------------------------------------------------------------------------
+
+IouringEndpoint::Poller* IouringEndpoint::GetPoller() const {
+    bthread_tag_t tag = bthread_self_tag();
+    if (tag < 0 || tag >= static_cast<int>(_poller_groups.size())) { tag = 0; }
+    auto& pollers = _poller_groups[tag].pollers;
+    const size_t index = butil::fmix32(_socket->id()) % pollers.size();
+    if (!pollers[index].ring_initialized) { return nullptr; }
+    return &pollers[index];
+}
+
+// ---------------------------------------------------------------------------
+// IouringPollerHandle – implementation
+//
+// ring() and Submit() are implemented here (not in the header) because they
+// need access to IouringEndpoint::Poller and IouringEndpoint::_poller_groups,
+// which are private and not yet fully defined at the point where
+// IouringPollerHandle is declared in iouring_helper.h.
+// ---------------------------------------------------------------------------
+
+int IouringPollerHandle::Submit(
+        std::function<int(::io_uring*)> prepare_fn) const {
+    if (tag_ < 0 || tag_ >= static_cast<int>(
+            IouringEndpoint::_poller_groups.size())) {
+        errno = ENODEV;
+        return -1;
+    }
+    auto& pollers = IouringEndpoint::_poller_groups[tag_].pollers;
+    if (index_ < 0 || index_ >= static_cast<int>(pollers.size())) {
+        errno = ENODEV;
+        return -1;
+    }
+    IouringEndpoint::Poller& poller = pollers[index_];
+    if (!poller.ring_initialized) {
+        errno = ENODEV;
+        return -1;
+    }
+    // Called on the Poller thread (passive path only); no lock needed.
+    int n = prepare_fn(&poller.ring);
+    if (n < 0) { errno = EBUSY; return -1; }
+    if (n == 0) { return 0; }
+    int ret = io_uring_submit(&poller.ring);
+    if (ret < 0) { errno = -ret; return -1; }
+    return ret;
+}
+
+// ---------------------------------------------------------------------------
+// SubmitOneSqe
+//
+// Must be called on the Poller thread; no locking.
+// Gets one SQE, calls |prepare_fn(sqe)|, issues io_uring_submit().
+// Returns io_uring_submit() result (>= 0) or -1 (errno set).
+//   errno=ENOBUFS  → SQ full
+//   errno=ENODEV   → ring not yet initialised
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::SubmitOneSqe(
+        std::function<void(struct io_uring_sqe*)> prepare_fn) {
+    Poller* poller = GetPoller();
+    if (!poller) { errno = ENODEV; return -1; }
+
+    struct io_uring_sqe* sqe = io_uring_get_sqe(&poller->ring);
+    if (!sqe) { errno = ENOBUFS; return -1; }
+    prepare_fn(sqe);
+    int ret = io_uring_submit(&poller->ring);
+    if (ret < 0) { errno = -ret; return -1; }
+    return ret;
+}
+
+// ---------------------------------------------------------------------------
+// Ring parameters factory
+// ---------------------------------------------------------------------------
+
+struct io_uring_params IouringEndpoint::BuildRingParams() {
+    struct io_uring_params p;
+    memset(&p, 0, sizeof(p));
+
+    const IouringPollingMode mode = GetPollingMode();
+
+    switch (mode) {
+    case IouringPollingMode::SQPOLL:
+    case IouringPollingMode::HYBRID:
+        p.flags |= IORING_SETUP_SQPOLL;
+        if (FLAGS_iouring_sqpoll_idle_ms > 0) {
+            p.sq_thread_idle = 
static_cast<unsigned>(FLAGS_iouring_sqpoll_idle_ms);
+        }
+        if (FLAGS_iouring_sqpoll_cpu >= 0) {
+            p.flags |= IORING_SETUP_SQ_AFF;
+            p.sq_thread_cpu = static_cast<unsigned>(FLAGS_iouring_sqpoll_cpu);
+        }
+        break;
+    case IouringPollingMode::IOPOLL:
+        p.flags |= IORING_SETUP_IOPOLL;
+        break;
+    case IouringPollingMode::NONE:
+    default:
+        break;
+    }
+
+    if (FLAGS_iouring_cq_size > 0) {
+        p.flags |= IORING_SETUP_CQSIZE;
+        p.cq_entries = static_cast<unsigned>(FLAGS_iouring_cq_size);
+    }
+
+    return p;
+}
+
+// ---------------------------------------------------------------------------
+// SubmitRead
+//
+// Registered mode  (--iouring_register_buffers=true):
+//   IORING_OP_READ_FIXED into _read_slot.buf / _read_slot.buf_index.
+//   _read_slot is always valid here (AllocateResources guarantees it).
+//
+// Unregistered mode (--iouring_register_buffers=false):
+//   IORING_OP_READ into a per-call malloc bounce buffer (64 KiB).
+//   The buffer is owned by IOBuf after PollCq and free()'d when consumed.
+// ---------------------------------------------------------------------------
+
+int IouringEndpoint::SubmitRead(int fd) {
+    IouringReqContext* ctx = new IouringReqContext{};
+    ctx->fd        = fd;
+    ctx->socket_id = _socket->id();
+
+    if (IsFixedBuffersEnabled()) {
+        // Registered path – _read_slot is always populated.
+        ctx->op = IOURING_OP_READ_FIXED;
+        const IouringReadSlot slot = _read_slot;
+        int ret = SubmitOneSqe([&](struct io_uring_sqe* sqe) {
+            io_uring_prep_read_fixed(sqe, fd,
+                                     slot.buf,
+                                     static_cast<unsigned>(slot.size),
+                                     /*offset=*/0,
+                                     slot.buf_index);
+            sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+        });
+        if (ret < 0) { delete ctx; return -1; }
+        return 0;
+    }
+
+    // Unregistered path – allocate a temporary bounce buffer.
+    constexpr size_t kBounceSize = 65536;
+    void* bounce = malloc(kBounceSize);
+    if (!bounce) { delete ctx; errno = ENOMEM; return -1; }
+    ctx->op = IOURING_OP_READ;
+
+    int ret = SubmitOneSqe([&](struct io_uring_sqe* sqe) {
+        io_uring_prep_read(sqe, fd, bounce, static_cast<unsigned>(kBounceSize),
+                           /*offset=*/0);
+        sqe->user_data = reinterpret_cast<uint64_t>(ctx) | kBrpcCqeTag;
+    });
+    if (ret < 0) { free(bounce); delete ctx; return -1; }
+    ctx->bounce = bounce;  // PollCq takes ownership and wraps it in IOBuf
+    return 0;
+}
+
+
+ssize_t IouringEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
+    CHECK(from != nullptr);
+    CHECK(ndata > 0);
+
+    if (!IsWritable()) { errno = EAGAIN; return -1; }
+
+    const int fd = _socket->fd();

Review Comment:
   - 这里 fd 还是 tcp 的 socket fd,不是 ring_fd 是吗?
   - 这里 iouring 就仅仅使用了数据的读写的 API ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to