Copilot commented on code in PR #3232:
URL: https://github.com/apache/brpc/pull/3232#discussion_r2868815640


##########
docs/cn/bthread_active_task.md:
##########
@@ -0,0 +1,322 @@
+# bthread Active Task(实验性/UNSTABLE)
+
+本文介绍当前 brpc 中新增的 **Active Task** 基础设施,以及在服务端请求处理中配合 `butex` 实现:
+
+- 请求处理 bthread 挂起等待(例如等待 io_uring completion)
+- 在 bthread worker 的 active-task hook 中收割 completion
+- 在 hook 内把 waiter 恢复到当前 worker 的 **local runqueue**(不走 `_remote_rq`)
+
+本文描述的是 **当前实现** 的使用方式与边界,接口位于 `bthread/unstable.h`,属于 UNSTABLE API。
+
+## 适用场景
+
+典型场景是“每个 bthread worker 一个本地 reactor”(例如每 worker 一个 io_uring ring):
+
+1. worker 初始化时创建本地 reactor/ring。
+2. 提交异步 IO 后,在私有 `butex` 上通过 `bthread_butex_wait_local` 挂起。
+4. worker 的 active-task hook 收割 completion。
+5. hook 内调用 `bthread_butex_wake_within(ctx, req->butex)` 唤醒 waiter。
+6. waiter bthread 在同一个 worker 上恢复执行(不会被 steal)。

Review Comment:
   The numbered list here skips from 2 to 4, which looks like a documentation 
typo and makes the quick overview harder to follow. Please renumber the items 
sequentially.
   ```suggestion
   3. worker 的 active-task hook 收割 completion。
   4. hook 内调用 `bthread_butex_wake_within(ctx, req->butex)` 唤醒 waiter。
   5. waiter bthread 在同一个 worker 上恢复执行(不会被 steal)。
   ```



##########
test/bthread_active_task_unittest.cpp:
##########
@@ -0,0 +1,1775 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include <atomic>
+#include <algorithm>
+#include <cstring>
+#include <cstdlib>
+#include <limits.h>
+#include <new>
+#include <pthread.h>
+#include <stdint.h>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+
+#include "butil/atomicops.h"
+#include "butil/time.h"
+#include "bthread/bthread.h"
+#include "bthread/butex.h"
+#include "bthread/task_control.h"
+#include "bthread/task_group.h"
+#include "bthread/unstable.h"
+
+namespace bthread {
+DECLARE_int32(bthread_active_task_poll_every_nswitch);
+DECLARE_int64(bthread_active_task_idle_wait_ns);
+}
+DECLARE_int32(task_group_runqueue_capacity);
+
+namespace {
+
+enum TestMode {
+    TEST_MODE_IDLE = 0,
+    TEST_MODE_IDLE_WAIT_INTERVAL = 1,
+    TEST_MODE_BUTEX_WAKE_WITHIN = 2,
+    TEST_MODE_BUTEX_WAKE_WITHIN_NULL = 3,
+    TEST_MODE_BUTEX_WAKE_WITHIN_NO_WAITER = 4,
+    TEST_MODE_BUTEX_WAKE_WITHIN_PTHREAD_WAITER = 5,
+    TEST_MODE_BUSY_PERIODIC_POLL_WAKE = 6,
+    TEST_MODE_SCENARIO_REQ_WAKE = 7,
+    TEST_MODE_SCENARIO_REQ_WAKE_BUSY_PERIODIC = 8,
+    TEST_MODE_BUTEX_WAKE_WITHIN_STRICT_CROSS_WORKER_REJECT = 9,
+    TEST_MODE_BUTEX_WAKE_WITHIN_EAGAIN_WHEN_PINNED_RQ_FULL = 10,
+};
+
+enum GenericWakeVariant {
+    GENERIC_WAKE = 0,
+    GENERIC_WAKE_N = 1,
+    GENERIC_WAKE_EXCEPT = 2,
+    GENERIC_REQUEUE = 3,
+};
+
+struct PerWorkerState {
+};
+
+struct MockReqCtx {
+    MockReqCtx()
+        : butex(NULL)
+        , result_ready(0)
+        , wake_rc(0)
+        , wake_errno(0)
+        , waiter_ready(0)
+        , waiter_done(0)
+        , resume_saw_result_ready(0)
+        , wait_rc(0)
+        , wait_errno(0)
+        , waiter_worker_pthread(0)
+        , resume_worker_pthread(0)
+        , hook_worker_pthread(0)
+        , completion_published(0) {}
+
+    void* butex;
+    std::atomic<int> result_ready;
+    std::atomic<int> wake_rc;
+    std::atomic<int> wake_errno;
+    std::atomic<int> waiter_ready;
+    std::atomic<int> waiter_done;
+    std::atomic<int> resume_saw_result_ready;
+    std::atomic<int> wait_rc;
+    std::atomic<int> wait_errno;
+    std::atomic<uint64_t> waiter_worker_pthread;
+    std::atomic<uint64_t> resume_worker_pthread;
+    std::atomic<uint64_t> hook_worker_pthread;
+    std::atomic<int> completion_published;
+};
+
+struct ActiveTaskTestState {
+    ActiveTaskTestState()
+        : mode(TEST_MODE_IDLE)
+        , init_calls(0)
+        , destroy_calls(0)
+        , harvest_calls(0)
+        , butex_ptr(0)
+        , butex_ptr_aux1(0)
+        , butex_ptr_aux2(0)
+        , pending_req_ptr(0)
+        , target_hook_worker_pthread(0)
+        , butex_expected_waiters(0)
+        , butex_wake_started(0)
+        , butex_wake_completed(0)
+        , butex_wake_rc(0)
+        , butex_wake_errno(0)
+        , butex_wake_rc_aux1(0)
+        , butex_wake_errno_aux1(0)
+        , butex_wake_rc_aux2(0)
+        , butex_wake_errno_aux2(0)
+        , hook_wake_harvest_calls(0)
+        , hook_action_inflight(0)
+        , butex_waiter_ready_count(0)
+        , butex_waiter_done_count(0)
+        , butex_waiter_resume_count(0)
+        , butex_waiter_worker_pthread(0)
+        , butex_waiter_resume_worker_pthread(0)
+        , pthread_waiter_ready_count(0)
+        , pthread_waiter_done_count(0)
+        , busy_task_started(0)
+        , busy_task_stop(0)
+        , busy_task_switches(0) {}
+    std::atomic<int> mode;
+    std::atomic<int> init_calls;
+    std::atomic<int> destroy_calls;
+    std::atomic<int> harvest_calls;
+    std::atomic<uintptr_t> butex_ptr;
+    std::atomic<uintptr_t> butex_ptr_aux1;
+    std::atomic<uintptr_t> butex_ptr_aux2;
+    std::atomic<uintptr_t> pending_req_ptr;
+    std::atomic<uint64_t> target_hook_worker_pthread;
+    std::atomic<int> butex_expected_waiters;
+    std::atomic<int> butex_wake_started;
+    std::atomic<int> butex_wake_completed;
+    std::atomic<int> butex_wake_rc;
+    std::atomic<int> butex_wake_errno;
+    std::atomic<int> butex_wake_rc_aux1;
+    std::atomic<int> butex_wake_errno_aux1;
+    std::atomic<int> butex_wake_rc_aux2;
+    std::atomic<int> butex_wake_errno_aux2;
+    std::atomic<int> hook_wake_harvest_calls;
+    std::atomic<int> hook_action_inflight;
+    std::atomic<int> butex_waiter_ready_count;
+    std::atomic<int> butex_waiter_done_count;
+    std::atomic<int> butex_waiter_resume_count;
+    std::atomic<uint64_t> butex_waiter_worker_pthread;
+    std::atomic<uint64_t> butex_waiter_resume_worker_pthread;
+    std::atomic<int> pthread_waiter_ready_count;
+    std::atomic<int> pthread_waiter_done_count;
+    std::atomic<int> busy_task_started;
+    std::atomic<int> busy_task_stop;
+    std::atomic<int> busy_task_switches;
+};
+
+struct PinnedWaitCtx {
+    PinnedWaitCtx()
+        : butex(NULL)
+        , use_timeout(false)
+        , timeout_ms(0)
+        , ready(0)
+        , done(0)
+        , wait_rc(0)
+        , wait_errno(0)
+        , pinned_worker_pthread(0)
+        , resume_worker_pthread(0) {}
+
+    void* butex;
+    bool use_timeout;
+    int timeout_ms;
+    std::atomic<int> ready;
+    std::atomic<int> done;
+    std::atomic<int> wait_rc;
+    std::atomic<int> wait_errno;
+    std::atomic<uint64_t> pinned_worker_pthread;
+    std::atomic<uint64_t> resume_worker_pthread;
+};
+
+ActiveTaskTestState g_state;
+std::atomic<int> g_register_rc(-1);
+std::atomic<int> g_register_once(0);
+bthread::TaskControl* g_shared_tc = NULL;
+std::atomic<int> g_shared_tc_once(0);
+
+void ResetState() {
+    g_state.mode.store(TEST_MODE_IDLE, std::memory_order_release);
+    g_state.init_calls.store(0, std::memory_order_relaxed);
+    g_state.destroy_calls.store(0, std::memory_order_relaxed);
+    g_state.harvest_calls.store(0, std::memory_order_relaxed);
+    g_state.butex_ptr.store(0, std::memory_order_relaxed);
+    g_state.butex_ptr_aux1.store(0, std::memory_order_relaxed);
+    g_state.butex_ptr_aux2.store(0, std::memory_order_relaxed);
+    g_state.pending_req_ptr.store(0, std::memory_order_relaxed);
+    g_state.target_hook_worker_pthread.store(0, std::memory_order_relaxed);
+    g_state.butex_expected_waiters.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_started.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_completed.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_rc.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_errno.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_rc_aux1.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_errno_aux1.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_rc_aux2.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_errno_aux2.store(0, std::memory_order_relaxed);
+    g_state.hook_wake_harvest_calls.store(0, std::memory_order_relaxed);
+    g_state.butex_waiter_ready_count.store(0, std::memory_order_relaxed);
+    g_state.butex_waiter_done_count.store(0, std::memory_order_relaxed);
+    g_state.butex_waiter_resume_count.store(0, std::memory_order_relaxed);
+    g_state.butex_waiter_worker_pthread.store(0, std::memory_order_relaxed);
+    g_state.butex_waiter_resume_worker_pthread.store(0, 
std::memory_order_relaxed);
+    g_state.pthread_waiter_ready_count.store(0, std::memory_order_relaxed);
+    g_state.pthread_waiter_done_count.store(0, std::memory_order_relaxed);
+    g_state.busy_task_started.store(0, std::memory_order_relaxed);
+    g_state.busy_task_stop.store(0, std::memory_order_relaxed);
+    g_state.busy_task_switches.store(0, std::memory_order_relaxed);
+}
+
+uint64_t PthreadToU64(pthread_t tid) {
+    uint64_t v = 0;
+    memcpy(&v, &tid, std::min(sizeof(v), sizeof(tid)));
+    return v;
+}
+
+bool WaitAtomicAtLeast(const std::atomic<int>& value, int expected, int 
timeout_ms) {
+    for (int i = 0; i < timeout_ms; ++i) {
+        if (value.load(std::memory_order_relaxed) >= expected) {
+            return true;
+        }
+        usleep(1000);
+    }
+    return value.load(std::memory_order_relaxed) >= expected;
+}
+
+bool WaitAtomicEqual(const std::atomic<int>& value, int expected, int 
timeout_ms) {
+    for (int i = 0; i < timeout_ms; ++i) {
+        if (value.load(std::memory_order_relaxed) == expected) {
+            return true;
+        }
+        usleep(1000);
+    }
+    return value.load(std::memory_order_relaxed) == expected;
+}
+
+bool WaitPinnedButexQueued(void* butex, int timeout_ms) {
+    for (int i = 0; i < timeout_ms; ++i) {
+        errno = 0;
+        const int rc = bthread::butex_wake(butex, true);
+        const int err = errno;
+        if (rc == -1 && err == EINVAL) {
+            return true;
+        }
+        if (rc == 0) {
+            usleep(1000);
+            continue;
+        }
+        return false;
+    }
+    return false;
+}
+
+void DrainHookActions() {
+    ASSERT_TRUE(WaitAtomicEqual(g_state.hook_action_inflight, 0, 5000));
+}
+
+void QuiesceHookActionsAfterModeIdle() {
+    DrainHookActions();
+    usleep(1000);
+    DrainHookActions();
+}
+
+void PrepareForCase() {
+    g_state.mode.store(TEST_MODE_IDLE, std::memory_order_release);
+    QuiesceHookActionsAfterModeIdle();
+    ResetState();
+}
+
+bthread::TaskControl& GetSharedTaskControl() {
+    int expected = 0;
+    if (g_shared_tc_once.compare_exchange_strong(
+            expected, 1, std::memory_order_relaxed)) {
+        g_shared_tc = new bthread::TaskControl();
+        CHECK(g_shared_tc != NULL);
+        // Keep one TaskControl instance in this process. Multiple TaskControl
+        // instances expose fixed-name bvars and conflict in CI builds with
+        // BRPC_BTHREAD_TRACER enabled.
+        CHECK_EQ(0, g_shared_tc->init(2));
+        CHECK(WaitAtomicAtLeast(g_state.init_calls, 2, 5000));
+    }
+    CHECK(g_shared_tc != NULL);
+    return *g_shared_tc;
+}
+
+bthread::TaskControl& GetSharedSingleWorkerTaskControl() {
+    return GetSharedTaskControl();
+}
+
+bthread::TaskControl& GetSharedTwoWorkerTaskControl() {
+    return GetSharedTaskControl();
+}

Review Comment:
   These helpers are named as if they return different TaskControl 
configurations, but both currently return the same shared TaskControl 
initialized with concurrency=2. This is misleading for readers and can cause 
incorrect assumptions in future edits; please either rename them to reflect the 
shared 2-worker instance, or actually initialize distinct single-worker vs 
two-worker TaskControls (if feasible).



##########
test/bthread_active_task_unittest.cpp:
##########
@@ -0,0 +1,1775 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include <atomic>
+#include <algorithm>
+#include <cstring>
+#include <cstdlib>
+#include <limits.h>
+#include <new>
+#include <pthread.h>
+#include <stdint.h>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+
+#include "butil/atomicops.h"
+#include "butil/time.h"
+#include "bthread/bthread.h"
+#include "bthread/butex.h"
+#include "bthread/task_control.h"
+#include "bthread/task_group.h"
+#include "bthread/unstable.h"
+
+namespace bthread {
+DECLARE_int32(bthread_active_task_poll_every_nswitch);
+DECLARE_int64(bthread_active_task_idle_wait_ns);
+}
+DECLARE_int32(task_group_runqueue_capacity);
+
+namespace {
+
+enum TestMode {
+    TEST_MODE_IDLE = 0,
+    TEST_MODE_IDLE_WAIT_INTERVAL = 1,
+    TEST_MODE_BUTEX_WAKE_WITHIN = 2,
+    TEST_MODE_BUTEX_WAKE_WITHIN_NULL = 3,
+    TEST_MODE_BUTEX_WAKE_WITHIN_NO_WAITER = 4,
+    TEST_MODE_BUTEX_WAKE_WITHIN_PTHREAD_WAITER = 5,
+    TEST_MODE_BUSY_PERIODIC_POLL_WAKE = 6,
+    TEST_MODE_SCENARIO_REQ_WAKE = 7,
+    TEST_MODE_SCENARIO_REQ_WAKE_BUSY_PERIODIC = 8,
+    TEST_MODE_BUTEX_WAKE_WITHIN_STRICT_CROSS_WORKER_REJECT = 9,
+    TEST_MODE_BUTEX_WAKE_WITHIN_EAGAIN_WHEN_PINNED_RQ_FULL = 10,
+};
+
+enum GenericWakeVariant {
+    GENERIC_WAKE = 0,
+    GENERIC_WAKE_N = 1,
+    GENERIC_WAKE_EXCEPT = 2,
+    GENERIC_REQUEUE = 3,
+};
+
+struct PerWorkerState {
+};
+
+struct MockReqCtx {
+    MockReqCtx()
+        : butex(NULL)
+        , result_ready(0)
+        , wake_rc(0)
+        , wake_errno(0)
+        , waiter_ready(0)
+        , waiter_done(0)
+        , resume_saw_result_ready(0)
+        , wait_rc(0)
+        , wait_errno(0)
+        , waiter_worker_pthread(0)
+        , resume_worker_pthread(0)
+        , hook_worker_pthread(0)
+        , completion_published(0) {}
+
+    void* butex;
+    std::atomic<int> result_ready;
+    std::atomic<int> wake_rc;
+    std::atomic<int> wake_errno;
+    std::atomic<int> waiter_ready;
+    std::atomic<int> waiter_done;
+    std::atomic<int> resume_saw_result_ready;
+    std::atomic<int> wait_rc;
+    std::atomic<int> wait_errno;
+    std::atomic<uint64_t> waiter_worker_pthread;
+    std::atomic<uint64_t> resume_worker_pthread;
+    std::atomic<uint64_t> hook_worker_pthread;
+    std::atomic<int> completion_published;
+};
+
+struct ActiveTaskTestState {
+    ActiveTaskTestState()
+        : mode(TEST_MODE_IDLE)
+        , init_calls(0)
+        , destroy_calls(0)
+        , harvest_calls(0)
+        , butex_ptr(0)
+        , butex_ptr_aux1(0)
+        , butex_ptr_aux2(0)
+        , pending_req_ptr(0)
+        , target_hook_worker_pthread(0)
+        , butex_expected_waiters(0)
+        , butex_wake_started(0)
+        , butex_wake_completed(0)
+        , butex_wake_rc(0)
+        , butex_wake_errno(0)
+        , butex_wake_rc_aux1(0)
+        , butex_wake_errno_aux1(0)
+        , butex_wake_rc_aux2(0)
+        , butex_wake_errno_aux2(0)
+        , hook_wake_harvest_calls(0)
+        , hook_action_inflight(0)
+        , butex_waiter_ready_count(0)
+        , butex_waiter_done_count(0)
+        , butex_waiter_resume_count(0)
+        , butex_waiter_worker_pthread(0)
+        , butex_waiter_resume_worker_pthread(0)
+        , pthread_waiter_ready_count(0)
+        , pthread_waiter_done_count(0)
+        , busy_task_started(0)
+        , busy_task_stop(0)
+        , busy_task_switches(0) {}
+    std::atomic<int> mode;
+    std::atomic<int> init_calls;
+    std::atomic<int> destroy_calls;
+    std::atomic<int> harvest_calls;
+    std::atomic<uintptr_t> butex_ptr;
+    std::atomic<uintptr_t> butex_ptr_aux1;
+    std::atomic<uintptr_t> butex_ptr_aux2;
+    std::atomic<uintptr_t> pending_req_ptr;
+    std::atomic<uint64_t> target_hook_worker_pthread;
+    std::atomic<int> butex_expected_waiters;
+    std::atomic<int> butex_wake_started;
+    std::atomic<int> butex_wake_completed;
+    std::atomic<int> butex_wake_rc;
+    std::atomic<int> butex_wake_errno;
+    std::atomic<int> butex_wake_rc_aux1;
+    std::atomic<int> butex_wake_errno_aux1;
+    std::atomic<int> butex_wake_rc_aux2;
+    std::atomic<int> butex_wake_errno_aux2;
+    std::atomic<int> hook_wake_harvest_calls;
+    std::atomic<int> hook_action_inflight;
+    std::atomic<int> butex_waiter_ready_count;
+    std::atomic<int> butex_waiter_done_count;
+    std::atomic<int> butex_waiter_resume_count;
+    std::atomic<uint64_t> butex_waiter_worker_pthread;
+    std::atomic<uint64_t> butex_waiter_resume_worker_pthread;
+    std::atomic<int> pthread_waiter_ready_count;
+    std::atomic<int> pthread_waiter_done_count;
+    std::atomic<int> busy_task_started;
+    std::atomic<int> busy_task_stop;
+    std::atomic<int> busy_task_switches;
+};
+
+struct PinnedWaitCtx {
+    PinnedWaitCtx()
+        : butex(NULL)
+        , use_timeout(false)
+        , timeout_ms(0)
+        , ready(0)
+        , done(0)
+        , wait_rc(0)
+        , wait_errno(0)
+        , pinned_worker_pthread(0)
+        , resume_worker_pthread(0) {}
+
+    void* butex;
+    bool use_timeout;
+    int timeout_ms;
+    std::atomic<int> ready;
+    std::atomic<int> done;
+    std::atomic<int> wait_rc;
+    std::atomic<int> wait_errno;
+    std::atomic<uint64_t> pinned_worker_pthread;
+    std::atomic<uint64_t> resume_worker_pthread;
+};
+
+ActiveTaskTestState g_state;
+std::atomic<int> g_register_rc(-1);
+std::atomic<int> g_register_once(0);
+bthread::TaskControl* g_shared_tc = NULL;
+std::atomic<int> g_shared_tc_once(0);
+
+void ResetState() {
+    g_state.mode.store(TEST_MODE_IDLE, std::memory_order_release);
+    g_state.init_calls.store(0, std::memory_order_relaxed);
+    g_state.destroy_calls.store(0, std::memory_order_relaxed);
+    g_state.harvest_calls.store(0, std::memory_order_relaxed);
+    g_state.butex_ptr.store(0, std::memory_order_relaxed);
+    g_state.butex_ptr_aux1.store(0, std::memory_order_relaxed);
+    g_state.butex_ptr_aux2.store(0, std::memory_order_relaxed);
+    g_state.pending_req_ptr.store(0, std::memory_order_relaxed);
+    g_state.target_hook_worker_pthread.store(0, std::memory_order_relaxed);
+    g_state.butex_expected_waiters.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_started.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_completed.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_rc.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_errno.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_rc_aux1.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_errno_aux1.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_rc_aux2.store(0, std::memory_order_relaxed);
+    g_state.butex_wake_errno_aux2.store(0, std::memory_order_relaxed);
+    g_state.hook_wake_harvest_calls.store(0, std::memory_order_relaxed);
+    g_state.butex_waiter_ready_count.store(0, std::memory_order_relaxed);
+    g_state.butex_waiter_done_count.store(0, std::memory_order_relaxed);
+    g_state.butex_waiter_resume_count.store(0, std::memory_order_relaxed);
+    g_state.butex_waiter_worker_pthread.store(0, std::memory_order_relaxed);
+    g_state.butex_waiter_resume_worker_pthread.store(0, 
std::memory_order_relaxed);
+    g_state.pthread_waiter_ready_count.store(0, std::memory_order_relaxed);
+    g_state.pthread_waiter_done_count.store(0, std::memory_order_relaxed);
+    g_state.busy_task_started.store(0, std::memory_order_relaxed);
+    g_state.busy_task_stop.store(0, std::memory_order_relaxed);
+    g_state.busy_task_switches.store(0, std::memory_order_relaxed);
+}
+
+uint64_t PthreadToU64(pthread_t tid) {
+    uint64_t v = 0;
+    memcpy(&v, &tid, std::min(sizeof(v), sizeof(tid)));
+    return v;
+}
+
+bool WaitAtomicAtLeast(const std::atomic<int>& value, int expected, int 
timeout_ms) {
+    for (int i = 0; i < timeout_ms; ++i) {
+        if (value.load(std::memory_order_relaxed) >= expected) {
+            return true;
+        }
+        usleep(1000);
+    }
+    return value.load(std::memory_order_relaxed) >= expected;
+}
+
+bool WaitAtomicEqual(const std::atomic<int>& value, int expected, int 
timeout_ms) {
+    for (int i = 0; i < timeout_ms; ++i) {
+        if (value.load(std::memory_order_relaxed) == expected) {
+            return true;
+        }
+        usleep(1000);
+    }
+    return value.load(std::memory_order_relaxed) == expected;
+}
+
+bool WaitPinnedButexQueued(void* butex, int timeout_ms) {
+    for (int i = 0; i < timeout_ms; ++i) {
+        errno = 0;
+        const int rc = bthread::butex_wake(butex, true);
+        const int err = errno;
+        if (rc == -1 && err == EINVAL) {
+            return true;
+        }
+        if (rc == 0) {
+            usleep(1000);
+            continue;
+        }
+        return false;
+    }
+    return false;
+}
+
+void DrainHookActions() {
+    ASSERT_TRUE(WaitAtomicEqual(g_state.hook_action_inflight, 0, 5000));
+}
+
+void QuiesceHookActionsAfterModeIdle() {
+    DrainHookActions();
+    usleep(1000);
+    DrainHookActions();
+}
+
+void PrepareForCase() {
+    g_state.mode.store(TEST_MODE_IDLE, std::memory_order_release);
+    QuiesceHookActionsAfterModeIdle();
+    ResetState();
+}
+
+bthread::TaskControl& GetSharedTaskControl() {
+    int expected = 0;
+    if (g_shared_tc_once.compare_exchange_strong(
+            expected, 1, std::memory_order_relaxed)) {
+        g_shared_tc = new bthread::TaskControl();
+        CHECK(g_shared_tc != NULL);
+        // Keep one TaskControl instance in this process. Multiple TaskControl
+        // instances expose fixed-name bvars and conflict in CI builds with
+        // BRPC_BTHREAD_TRACER enabled.
+        CHECK_EQ(0, g_shared_tc->init(2));
+        CHECK(WaitAtomicAtLeast(g_state.init_calls, 2, 5000));
+    }
+    CHECK(g_shared_tc != NULL);
+    return *g_shared_tc;
+}
+
+bthread::TaskControl& GetSharedSingleWorkerTaskControl() {
+    return GetSharedTaskControl();
+}
+
+bthread::TaskControl& GetSharedTwoWorkerTaskControl() {
+    return GetSharedTaskControl();
+}
+
+void* TestButexWaitTask(void*) {
+    void* butex = reinterpret_cast<void*>(
+        g_state.butex_ptr.load(std::memory_order_relaxed));
+    if (butex == NULL) {
+        g_state.butex_waiter_done_count.fetch_add(1, 
std::memory_order_relaxed);
+        return NULL;
+    }
+    g_state.butex_waiter_worker_pthread.store(PthreadToU64(pthread_self()),
+                                              std::memory_order_relaxed);
+    g_state.butex_waiter_ready_count.fetch_add(1, std::memory_order_relaxed);
+    const int rc = bthread::butex_wait(butex, 0, NULL);
+    if (rc == 0) {
+        g_state.butex_waiter_resume_count.fetch_add(1, 
std::memory_order_relaxed);
+        
g_state.butex_waiter_resume_worker_pthread.store(PthreadToU64(pthread_self()),
+                                                         
std::memory_order_relaxed);
+    } else if (errno == EWOULDBLOCK) {
+        // Value changed before waiter was queued; still resumed from caller's 
view.
+        g_state.butex_waiter_resume_count.fetch_add(1, 
std::memory_order_relaxed);
+        
g_state.butex_waiter_resume_worker_pthread.store(PthreadToU64(pthread_self()),
+                                                         
std::memory_order_relaxed);
+    }
+    g_state.butex_waiter_done_count.fetch_add(1, std::memory_order_relaxed);
+    return NULL;
+}
+
+void* TestBusyYieldTask(void*) {
+    g_state.busy_task_started.fetch_add(1, std::memory_order_relaxed);
+    while (!g_state.busy_task_stop.load(std::memory_order_relaxed)) {
+        g_state.busy_task_switches.fetch_add(1, std::memory_order_relaxed);
+        bthread_yield();
+    }
+    return NULL;
+}
+
+void* TestPthreadButexWait(void*) {
+    void* butex = reinterpret_cast<void*>(
+        g_state.butex_ptr.load(std::memory_order_relaxed));
+    if (butex == NULL) {
+        g_state.pthread_waiter_done_count.fetch_add(1, 
std::memory_order_relaxed);
+        return NULL;
+    }
+    g_state.pthread_waiter_ready_count.fetch_add(1, std::memory_order_relaxed);
+    const int rc = bthread::butex_wait(butex, 0, NULL);
+    if (rc != 0 && errno != EWOULDBLOCK) {
+        // Still count completion; assertions are on within wake behavior.
+    }
+    g_state.pthread_waiter_done_count.fetch_add(1, std::memory_order_relaxed);
+    return NULL;
+}
+
+void* TestRequestWaitTask(void* arg) {
+    MockReqCtx* req = static_cast<MockReqCtx*>(arg);
+    if (req == NULL || req->butex == NULL) {
+        return NULL;
+    }
+    req->waiter_worker_pthread.store(PthreadToU64(pthread_self()),
+                                     std::memory_order_relaxed);
+    req->waiter_ready.store(1, std::memory_order_release);
+    errno = 0;
+    const int rc = bthread_butex_wait_local(req->butex, 0, NULL);
+    const int err = errno;
+    req->wait_rc.store(rc, std::memory_order_relaxed);
+    req->wait_errno.store(err, std::memory_order_relaxed);
+    req->resume_worker_pthread.store(PthreadToU64(pthread_self()),
+                                     std::memory_order_relaxed);
+    if (req->result_ready.load(std::memory_order_acquire) == 1) {
+        req->resume_saw_result_ready.store(1, std::memory_order_relaxed);
+    }
+    req->waiter_done.store(1, std::memory_order_release);
+    return NULL;
+}
+
+void* TestPinnedButexLocalWaitTask(void*) {
+    void* butex = reinterpret_cast<void*>(
+        g_state.butex_ptr.load(std::memory_order_relaxed));
+    if (butex == NULL) {
+        g_state.butex_waiter_done_count.fetch_add(1, 
std::memory_order_relaxed);
+        return NULL;
+    }
+    const uint64_t home = PthreadToU64(pthread_self());
+    g_state.butex_waiter_worker_pthread.store(home, std::memory_order_relaxed);
+    g_state.butex_waiter_ready_count.fetch_add(1, std::memory_order_relaxed);
+    const int rc = bthread_butex_wait_local(butex, 0, NULL);
+    if (rc == 0 || errno == EWOULDBLOCK) {
+        g_state.butex_waiter_resume_count.fetch_add(1, 
std::memory_order_relaxed);
+        
g_state.butex_waiter_resume_worker_pthread.store(PthreadToU64(pthread_self()),
+                                                         
std::memory_order_relaxed);
+    }
+    g_state.butex_waiter_done_count.fetch_add(1, std::memory_order_relaxed);
+    return NULL;
+}
+
+void* TestPinnedWaitTask(void* arg) {
+    PinnedWaitCtx* ctx = static_cast<PinnedWaitCtx*>(arg);
+    if (ctx == NULL || ctx->butex == NULL) {
+        if (ctx) {
+            ctx->done.store(1, std::memory_order_release);
+        }
+        return NULL;
+    }
+
+    const uint64_t home = PthreadToU64(pthread_self());
+    ctx->pinned_worker_pthread.store(home, std::memory_order_relaxed);
+    ctx->ready.store(1, std::memory_order_release);
+
+    timespec abstime;
+    const timespec* pabstime = NULL;
+    if (ctx->use_timeout) {
+        abstime = butil::milliseconds_from_now(ctx->timeout_ms);
+        pabstime = &abstime;
+    }
+    errno = 0;
+    const int wait_rc = bthread_butex_wait_local(ctx->butex, 0, pabstime);
+    const int wait_errno = errno;
+    ctx->wait_rc.store(wait_rc, std::memory_order_relaxed);
+    ctx->wait_errno.store(wait_errno, std::memory_order_relaxed);
+    ctx->resume_worker_pthread.store(PthreadToU64(pthread_self()),
+                                     std::memory_order_relaxed);
+    ctx->done.store(1, std::memory_order_release);
+    return NULL;
+}
+
+bool MaybeRunWithinWakeFromHook(const bthread_active_task_ctx_t* ctx,
+                                int mode,
+                                bool* skip_park_out) {
+    struct ScopedHookInflight {
+        ScopedHookInflight() {
+            g_state.hook_action_inflight.fetch_add(1, 
std::memory_order_relaxed);
+        }
+        ~ScopedHookInflight() {
+            g_state.hook_action_inflight.fetch_sub(1, 
std::memory_order_relaxed);
+        }
+    } scoped_inflight;
+
+    if (mode != TEST_MODE_BUTEX_WAKE_WITHIN &&
+        mode != TEST_MODE_BUTEX_WAKE_WITHIN_NULL &&
+        mode != TEST_MODE_BUTEX_WAKE_WITHIN_NO_WAITER &&
+        mode != TEST_MODE_BUTEX_WAKE_WITHIN_PTHREAD_WAITER &&
+        mode != TEST_MODE_BUSY_PERIODIC_POLL_WAKE &&
+        mode != TEST_MODE_SCENARIO_REQ_WAKE &&
+        mode != TEST_MODE_SCENARIO_REQ_WAKE_BUSY_PERIODIC &&
+        mode != TEST_MODE_BUTEX_WAKE_WITHIN_STRICT_CROSS_WORKER_REJECT &&
+        mode != TEST_MODE_BUTEX_WAKE_WITHIN_EAGAIN_WHEN_PINNED_RQ_FULL) {
+        return false;
+    }
+
+    const uint64_t target_worker = g_state.target_hook_worker_pthread.load(
+        std::memory_order_relaxed);
+    if (target_worker != 0 && PthreadToU64(ctx->worker_pthread) != 
target_worker) {
+        return false;
+    }
+
+    const bool is_scenario_req_wake =
+        (mode == TEST_MODE_SCENARIO_REQ_WAKE ||
+         mode == TEST_MODE_SCENARIO_REQ_WAKE_BUSY_PERIODIC);
+
+    if (mode == TEST_MODE_BUTEX_WAKE_WITHIN ||
+        mode == TEST_MODE_BUTEX_WAKE_WITHIN_STRICT_CROSS_WORKER_REJECT ||
+        mode == TEST_MODE_BUTEX_WAKE_WITHIN_EAGAIN_WHEN_PINNED_RQ_FULL ||
+        mode == TEST_MODE_BUSY_PERIODIC_POLL_WAKE) {
+        const int expected_waiters =
+            g_state.butex_expected_waiters.load(std::memory_order_relaxed);
+        if (g_state.butex_waiter_ready_count.load(std::memory_order_relaxed) <
+            expected_waiters) {
+            return true;
+        }
+    } else if (mode == TEST_MODE_BUTEX_WAKE_WITHIN_PTHREAD_WAITER) {
+        if (g_state.pthread_waiter_ready_count.load(std::memory_order_relaxed) 
< 1) {
+            return true;
+        }
+    } else if (is_scenario_req_wake) {
+        MockReqCtx* req = reinterpret_cast<MockReqCtx*>(
+            g_state.pending_req_ptr.load(std::memory_order_acquire));
+        if (req == NULL ||
+            req->waiter_ready.load(std::memory_order_acquire) < 1 ||
+            req->completion_published.load(std::memory_order_acquire) == 0) {
+            return true;
+        }
+    }
+
+    int expected = 0;
+    if (!g_state.butex_wake_started.compare_exchange_strong(
+            expected, 1, std::memory_order_relaxed)) {
+        return true;
+    }
+
+    g_state.hook_wake_harvest_calls.fetch_add(1, std::memory_order_relaxed);
+
+    void* butex = NULL;
+    MockReqCtx* req = NULL;
+    if (mode == TEST_MODE_BUTEX_WAKE_WITHIN_EAGAIN_WHEN_PINNED_RQ_FULL) {
+        void* butex0 = reinterpret_cast<void*>(
+            g_state.butex_ptr.load(std::memory_order_relaxed));
+        void* butex1 = reinterpret_cast<void*>(
+            g_state.butex_ptr_aux1.load(std::memory_order_relaxed));
+        void* butex2 = reinterpret_cast<void*>(
+            g_state.butex_ptr_aux2.load(std::memory_order_relaxed));
+        errno = 0;
+        const int rc0 = bthread_butex_wake_within(ctx, butex0);
+        const int err0 = errno;
+        errno = 0;
+        const int rc1 = bthread_butex_wake_within(ctx, butex1);
+        const int err1 = errno;
+        errno = 0;
+        const int rc2 = bthread_butex_wake_within(ctx, butex2);
+        const int err2 = errno;
+
+        g_state.butex_wake_rc.store(rc0, std::memory_order_relaxed);
+        g_state.butex_wake_errno.store(err0, std::memory_order_relaxed);
+        g_state.butex_wake_rc_aux1.store(rc1, std::memory_order_relaxed);
+        g_state.butex_wake_errno_aux1.store(err1, std::memory_order_relaxed);
+        g_state.butex_wake_rc_aux2.store(rc2, std::memory_order_relaxed);
+        g_state.butex_wake_errno_aux2.store(err2, std::memory_order_relaxed);
+
+        if (!(rc0 == 1 && rc1 == 1 && rc2 == -1 && err2 == EAGAIN)) {
+            // setup race: retry in next harvest round.
+            g_state.butex_wake_started.store(0, std::memory_order_relaxed);
+            return true;
+        }
+        g_state.butex_wake_completed.fetch_add(1, std::memory_order_relaxed);
+        if (skip_park_out) {
+            *skip_park_out = true;
+        }
+        return true;
+    }
+
+    if (is_scenario_req_wake) {
+        req = reinterpret_cast<MockReqCtx*>(
+            g_state.pending_req_ptr.load(std::memory_order_acquire));
+        if (req == NULL) {
+            g_state.butex_wake_started.store(0, std::memory_order_relaxed);
+            return true;
+        }
+        req->hook_worker_pthread.store(PthreadToU64(ctx->worker_pthread),
+                                       std::memory_order_relaxed);
+        req->result_ready.store(1, std::memory_order_release);
+        butex = req->butex;
+    } else {
+        butex = reinterpret_cast<void*>(
+            g_state.butex_ptr.load(std::memory_order_relaxed));
+        if (mode == TEST_MODE_BUTEX_WAKE_WITHIN_NULL) {
+            butex = NULL;
+        }
+    }
+
+    errno = 0;
+    const int rc = bthread_butex_wake_within(ctx, butex);
+    const int err = errno;
+    g_state.butex_wake_rc.store(rc, std::memory_order_relaxed);
+    g_state.butex_wake_errno.store(err, std::memory_order_relaxed);
+
+    bool done = true;
+    if (is_scenario_req_wake) {
+        done = (rc == 1);
+    } else if (mode == TEST_MODE_BUTEX_WAKE_WITHIN ||
+        mode == TEST_MODE_BUTEX_WAKE_WITHIN_STRICT_CROSS_WORKER_REJECT ||
+        mode == TEST_MODE_BUSY_PERIODIC_POLL_WAKE) {
+        const int expected_waiters =
+            g_state.butex_expected_waiters.load(std::memory_order_relaxed);
+        if (mode == TEST_MODE_BUTEX_WAKE_WITHIN_STRICT_CROSS_WORKER_REJECT) {
+            done = (rc == -1 && err == EINVAL);
+        } else if (expected_waiters == 1) {
+            done = (rc == 1);
+        } else if (expected_waiters > 1) {
+            done = (rc == -1 && err == EINVAL) || (rc == 1);
+        }
+    } else if (mode == TEST_MODE_BUTEX_WAKE_WITHIN_PTHREAD_WAITER) {
+        done = (rc == -1 && err == EINVAL);
+    }
+
+    if (!done) {
+        g_state.butex_wake_started.store(0, std::memory_order_relaxed);
+        return true;
+    }
+
+    if (req != NULL) {
+        req->wake_rc.store(rc, std::memory_order_relaxed);
+        req->wake_errno.store(err, std::memory_order_relaxed);
+    }
+    g_state.butex_wake_completed.fetch_add(1, std::memory_order_relaxed);
+    if (skip_park_out) {
+        *skip_park_out = true;
+    }
+    return true;
+}
+
+int ActiveTaskWorkerInit(void** worker_local,
+                         const bthread_active_task_ctx_t* ctx,
+                         void*) {
+    (void)ctx;
+    PerWorkerState* s = new (std::nothrow) PerWorkerState;
+    if (s == NULL) {
+        return ENOMEM;
+    }
+    *worker_local = s;
+    g_state.init_calls.fetch_add(1, std::memory_order_relaxed);
+    return 0;
+}
+
+void ActiveTaskWorkerDestroy(void* worker_local,
+                             const bthread_active_task_ctx_t*,
+                             void*) {
+    delete static_cast<PerWorkerState*>(worker_local);
+    g_state.destroy_calls.fetch_add(1, std::memory_order_relaxed);
+}
+
+int ActiveTaskHarvest(void* worker_local,
+                      const bthread_active_task_ctx_t* ctx) {
+    g_state.harvest_calls.fetch_add(1, std::memory_order_relaxed);
+    (void)worker_local;
+    const int mode = g_state.mode.load(std::memory_order_acquire);
+    if (mode == TEST_MODE_IDLE_WAIT_INTERVAL) {
+        return 0;
+    }
+    bool skip_park = false;
+    if (MaybeRunWithinWakeFromHook(ctx, mode, &skip_park)) {
+        return skip_park ? 1 : 0;
+    }
+    return 0;
+}
+
+int RegisterTestActiveTaskType() {
+    bthread_active_task_type_t type;
+    memset(&type, 0, sizeof(type));
+    type.struct_size = sizeof(type);
+    type.name = "active_task_unittest";
+    type.worker_init = ActiveTaskWorkerInit;
+    type.worker_destroy = ActiveTaskWorkerDestroy;
+    type.harvest = ActiveTaskHarvest;
+    return bthread_register_active_task_type(&type);
+}
+
+void* JustExit(void*) {
+    return NULL;
+}
+
+int ChildCheckRegisterAfterInitRejected() {
+    bthread_t tid = INVALID_BTHREAD;
+    if (bthread_start_background(&tid, NULL, JustExit, NULL) != 0) {
+        return 2;
+    }
+    bthread_join(tid, NULL);
+
+    bthread_active_task_type_t type;
+    memset(&type, 0, sizeof(type));
+    type.struct_size = sizeof(type);
+    type.name = "active_task_after_init";
+    type.harvest = ActiveTaskHarvest;
+    const int rc = bthread_register_active_task_type(&type);
+    return (rc == EPERM ? 0 : 3);
+}
+
+int ChildCheckLocalWorkerInitDestroyAndIdleWaitInterval() {
+    if (g_register_rc.load(std::memory_order_relaxed) != 0) {
+        return 20;
+    }
+    PrepareForCase();
+    {
+        bthread::TaskControl tc;
+        if (tc.init(2) != 0) {
+            return 21;
+        }
+        if (!WaitAtomicAtLeast(g_state.init_calls, 2, 5000)) {
+            return 22;
+        }
+        const int harvest_snapshot = 
g_state.harvest_calls.load(std::memory_order_relaxed);
+        const int64_t old_idle_wait_ns = 
bthread::FLAGS_bthread_active_task_idle_wait_ns;
+        bthread::FLAGS_bthread_active_task_idle_wait_ns = 1000 * 1000;  // 1ms
+        g_state.mode.store(TEST_MODE_IDLE_WAIT_INTERVAL, 
std::memory_order_release);
+        tc.signal_task(2, BTHREAD_TAG_DEFAULT);
+        if (!WaitAtomicAtLeast(g_state.harvest_calls, harvest_snapshot + 3, 
5000)) {
+            bthread::FLAGS_bthread_active_task_idle_wait_ns = old_idle_wait_ns;
+            return 23;
+        }
+        bthread::FLAGS_bthread_active_task_idle_wait_ns = old_idle_wait_ns;
+        g_state.mode.store(TEST_MODE_IDLE, std::memory_order_release);
+        if (!WaitAtomicEqual(g_state.hook_action_inflight, 0, 5000)) {
+            return 25;
+        }
+    }
+    return (g_state.destroy_calls.load(std::memory_order_relaxed) == 2 ? 0 : 
24);
+}
+
+int ChildCheckWakeWithinEagainWhenPinnedRqFull() {
+    if (g_register_rc.load(std::memory_order_relaxed) != 0) {
+        return 30;
+    }
+    const int32_t old_runqueue_capacity = FLAGS_task_group_runqueue_capacity;
+    FLAGS_task_group_runqueue_capacity = 2;
+
+    int ret = 0;
+    PrepareForCase();
+    {
+        bthread::TaskControl tc;
+        if (tc.init(1) != 0) {
+            ret = 31;
+        } else if (!WaitAtomicAtLeast(g_state.init_calls, 1, 5000)) {
+            ret = 32;
+        } else {
+            bthread::TaskGroup* tg = tc.choose_one_group(BTHREAD_TAG_DEFAULT);
+            if (tg == NULL) {
+                ret = 33;
+            } else {
+                PinnedWaitCtx ctx[3];
+                bthread_t tids[3] = {
+                    INVALID_BTHREAD, INVALID_BTHREAD, INVALID_BTHREAD };
+                void* butexes[3] = { NULL, NULL, NULL };
+                for (int i = 0; i < 3; ++i) {
+                    butexes[i] = bthread::butex_create();
+                    if (butexes[i] == NULL) {
+                        ret = 34;
+                        break;
+                    }
+                    static_cast<butil::atomic<int>*>(butexes[i])->store(
+                        0, butil::memory_order_relaxed);
+                    ctx[i].butex = butexes[i];
+                    if (tg->start_background<true>(&tids[i], NULL,
+                                                   TestPinnedWaitTask, 
&ctx[i]) != 0) {
+                        ret = 35;
+                        break;
+                    }
+                    tg->flush_nosignal_tasks();
+                    if (!WaitAtomicAtLeast(ctx[i].ready, 1, 5000)) {
+                        ret = 36;
+                        break;
+                    }
+                    if (!WaitPinnedButexQueued(butexes[i], 5000)) {
+                        ret = 37;
+                        break;
+                    }
+                }
+
+                if (ret == 0) {
+                    const uint64_t home = ctx[0].pinned_worker_pthread.load(
+                        std::memory_order_relaxed);
+                    if (home == 0) {
+                        ret = 38;
+                    } else {
+                        
g_state.butex_ptr.store(reinterpret_cast<uintptr_t>(butexes[0]),
+                                                std::memory_order_relaxed);
+                        
g_state.butex_ptr_aux1.store(reinterpret_cast<uintptr_t>(butexes[1]),
+                                                     
std::memory_order_relaxed);
+                        
g_state.butex_ptr_aux2.store(reinterpret_cast<uintptr_t>(butexes[2]),
+                                                     
std::memory_order_relaxed);
+                        g_state.butex_expected_waiters.store(0, 
std::memory_order_relaxed);
+                        g_state.target_hook_worker_pthread.store(home,
+                                                                 
std::memory_order_relaxed);
+                        g_state.mode.store(
+                            
TEST_MODE_BUTEX_WAKE_WITHIN_EAGAIN_WHEN_PINNED_RQ_FULL,
+                            std::memory_order_release);
+                        tc.signal_task(1, BTHREAD_TAG_DEFAULT);
+                        if (!WaitAtomicAtLeast(g_state.butex_wake_completed, 
1, 5000)) {
+                            ret = 39;
+                        } else if 
(g_state.butex_wake_rc.load(std::memory_order_relaxed) != 1 ||
+                                   
g_state.butex_wake_rc_aux1.load(std::memory_order_relaxed) != 1 ||
+                                   
g_state.butex_wake_rc_aux2.load(std::memory_order_relaxed) != -1 ||
+                                   
g_state.butex_wake_errno_aux2.load(std::memory_order_relaxed) != EAGAIN) {
+                            ret = 40;
+                        } else {
+                            g_state.mode.store(TEST_MODE_IDLE, 
std::memory_order_release);
+                            QuiesceHookActionsAfterModeIdle();
+                            g_state.butex_wake_started.store(0, 
std::memory_order_relaxed);
+                            g_state.butex_wake_completed.store(0, 
std::memory_order_relaxed);
+                            g_state.butex_wake_rc.store(0, 
std::memory_order_relaxed);
+                            g_state.butex_wake_errno.store(0, 
std::memory_order_relaxed);
+                            g_state.target_hook_worker_pthread.store(home,
+                                                                     
std::memory_order_relaxed);
+                            g_state.butex_ptr.store(
+                                reinterpret_cast<uintptr_t>(butexes[2]),
+                                std::memory_order_relaxed);
+                            g_state.mode.store(TEST_MODE_BUTEX_WAKE_WITHIN,
+                                               std::memory_order_release);
+                            tc.signal_task(1, BTHREAD_TAG_DEFAULT);
+                            if 
(!WaitAtomicAtLeast(g_state.butex_wake_completed, 1, 5000)) {
+                                ret = 41;
+                            } else if 
(g_state.butex_wake_rc.load(std::memory_order_relaxed) != 1) {
+                                ret = 42;
+                            }
+                        }
+                    }
+                }
+
+                g_state.mode.store(TEST_MODE_IDLE, std::memory_order_release);
+                g_state.target_hook_worker_pthread.store(0, 
std::memory_order_relaxed);
+                QuiesceHookActionsAfterModeIdle();
+                for (int i = 0; i < 3; ++i) {
+                    if (tids[i] != INVALID_BTHREAD) {
+                        if (bthread_join(tids[i], NULL) != 0 && ret == 0) {
+                            ret = 43;
+                        }
+                    }
+                    if (butexes[i] != NULL) {
+                        bthread::butex_destroy(butexes[i]);
+                    }
+                }
+                g_state.butex_ptr.store(0, std::memory_order_relaxed);
+                g_state.butex_ptr_aux1.store(0, std::memory_order_relaxed);
+                g_state.butex_ptr_aux2.store(0, std::memory_order_relaxed);
+            }
+        }
+    }
+    FLAGS_task_group_runqueue_capacity = old_runqueue_capacity;
+    return ret;
+}
+
+int RunChildMode(const char* mode) {
+    pid_t pid = fork();
+    if (pid < 0) {
+        return -1;
+    }
+    if (pid == 0) {
+        char self_path[PATH_MAX];
+        const ssize_t n = readlink("/proc/self/exe", self_path, 
sizeof(self_path) - 1);
+        if (n <= 0) {
+            _exit(4);
+        }
+        self_path[n] = '\0';
+        setenv("BRPC_ACTIVE_TASK_UT_CHILD_MODE", mode, 1);
+        char* const argv[] = { self_path, NULL };
+        execv(self_path, argv);
+        _exit(5);

Review Comment:
   `RunChildMode` relies on `readlink("/proc/self/exe", ...)`, which is 
Linux-specific. If these tests are ever run on macOS (CI currently compiles on 
macOS), they will fail at runtime; consider using a portable way to get the 
current executable path (e.g. `_NSGetExecutablePath` on macOS, or fall back to 
`argv[0]` passed into the test binary).



##########
src/bthread/butex.cpp:
##########
@@ -295,13 +324,95 @@ inline TaskGroup* get_task_group(TaskControl* c, 
bthread_tag_t tag) {
 }
 
 inline void run_in_local_task_group(TaskGroup* g, TaskMeta* next_meta, bool 
nosignal) {
+    // Pinned tasks must go through pin-aware routing even on same-tag local 
fast
+    // paths, otherwise TaskGroup::exchange() may resume them on the wrong 
worker.
+    if (next_meta->local_pin_enabled && next_meta->local_pin_depth > 0) {
+        g->ready_to_run(next_meta, nosignal);
+        return;
+    }
     if (!nosignal) {
         TaskGroup::exchange(&g, next_meta);
     } else {
         g->ready_to_run(next_meta, nosignal);
     }
 }
 
+inline bool is_pinned_waiter(const ButexWaiter* bw) {
+    if (bw == NULL || bw->tid == 0) {
+        return false;
+    }
+    const ButexBthreadWaiter* bbw = static_cast<const ButexBthreadWaiter*>(bw);
+    const TaskMeta* meta = bbw->task_meta;
+    return meta != NULL && meta->local_pin_enabled && meta->local_pin_depth > 
0;
+}
+
+template <typename ShouldCheck>
+inline bool reject_if_selected_contains_pinned(ButexWaiterList* waiters,
+                                                const ShouldCheck& 
should_check) {
+    size_t index = 0;
+    for (butil::LinkNode<ButexWaiter>* p = waiters->head();
+         p != waiters->end(); p = p->next(), ++index) {
+        ButexWaiter* bw = p->value();
+        if (should_check(bw, index) && is_pinned_waiter(bw)) {
+            butex_strict_reject_count() << 1;
+            errno = EINVAL;
+            return true;
+        }
+    }
+    return false;
+}
+
+int butex_wake_to_task_group(void* arg, TaskGroup* target_group) {
+    if (arg == NULL || target_group == NULL) {
+        butex_within_invalid_count() << 1;
+        errno = EINVAL;
+        return -1;
+    }
+    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, 
value);
+    ButexBthreadWaiter* bbw = NULL;
+    {
+        BAIDU_SCOPED_LOCK(b->waiter_lock);
+        if (b->waiters.empty()) {
+            butex_within_no_waiter_count() << 1;
+            return 0;
+        }
+        butil::LinkNode<ButexWaiter>* head = b->waiters.head();
+        if (head->next() != b->waiters.end()) {
+            butex_within_invalid_count() << 1;
+            errno = EINVAL;
+            return -1;
+        }
+        ButexWaiter* bw = head->value();
+        if (bw->tid == 0) {
+            butex_within_invalid_count() << 1;
+            errno = EINVAL;
+            return -1;
+        }
+        bbw = static_cast<ButexBthreadWaiter*>(bw);
+        if (bbw->home_group != target_group ||
+            bbw->control != target_group->control() ||
+            bbw->tag != target_group->tag()) {
+            butex_within_invalid_count() << 1;
+            errno = EINVAL;
+            return -1;
+        }
+        // wake_within() runs on target_group's owner worker. For pinned 
waiters,
+        // if owner-local pinned runqueue is already full, report EAGAIN and
+        // keep waiter on butex list for next harvest retry, instead of
+        // blocking/spinning here.
+        if (is_pinned_waiter(bw) && target_group->pinned_rq_full()) {
+            errno = EAGAIN;
+            return -1;
+        }
+        bw->RemoveFromList();
+        bw->container.store(NULL, butil::memory_order_relaxed);
+    }
+
+    unsleep_if_necessary(bbw, get_global_timer_thread());
+    target_group->ready_to_run(bbw->task_meta, true);
+    return 1;

Review Comment:
   `butex_wake_to_task_group` tries to avoid blocking in active-task hooks by 
checking `pinned_rq_full()` and returning `EAGAIN`, but the check isn't atomic 
with the subsequent enqueue (`target_group->ready_to_run(...)`). If the pinned 
runqueue becomes full between the check and `ready_to_run`, `push_pinned_rq` 
may spin/sleep inside the hook, violating the non-blocking intent. Consider 
adding a non-blocking/try-enqueue path for within-wake (or re-check/return 
`EAGAIN` on enqueue failure) so hooks never block.



##########
src/bthread/bthread.cpp:
##########
@@ -85,6 +87,8 @@ pthread_mutex_t g_task_control_mutex = 
PTHREAD_MUTEX_INITIALIZER;
 // Notice that we can't declare the variable as atomic<TaskControl*> which
 // are not constructed before main().
 TaskControl* g_task_control = NULL;
+static pthread_mutex_t g_active_task_registry_mutex = 
PTHREAD_MUTEX_INITIALIZER;
+static std::vector<bthread_active_task_type_t> g_active_task_types;
 

Review Comment:
   `std::vector` is used for `g_active_task_types`, but this file doesn't 
include `<vector>`. This will fail to compile on toolchains that don't pull 
`<vector>` transitively; please add an explicit `#include <vector>` in the 
includes section.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to