MalikHou commented on code in PR #3232:
URL: https://github.com/apache/brpc/pull/3232#discussion_r2869059669
##########
src/bthread/butex.cpp:
##########
@@ -295,13 +324,95 @@ inline TaskGroup* get_task_group(TaskControl* c,
bthread_tag_t tag) {
}
inline void run_in_local_task_group(TaskGroup* g, TaskMeta* next_meta, bool
nosignal) {
+ // Pinned tasks must go through pin-aware routing even on same-tag local
fast
+ // paths, otherwise TaskGroup::exchange() may resume them on the wrong
worker.
+ if (next_meta->local_pin_enabled && next_meta->local_pin_depth > 0) {
+ g->ready_to_run(next_meta, nosignal);
+ return;
+ }
if (!nosignal) {
TaskGroup::exchange(&g, next_meta);
} else {
g->ready_to_run(next_meta, nosignal);
}
}
+inline bool is_pinned_waiter(const ButexWaiter* bw) {
+ if (bw == NULL || bw->tid == 0) {
+ return false;
+ }
+ const ButexBthreadWaiter* bbw = static_cast<const ButexBthreadWaiter*>(bw);
+ const TaskMeta* meta = bbw->task_meta;
+ return meta != NULL && meta->local_pin_enabled && meta->local_pin_depth >
0;
+}
+
+template <typename ShouldCheck>
+inline bool reject_if_selected_contains_pinned(ButexWaiterList* waiters,
+ const ShouldCheck&
should_check) {
+ size_t index = 0;
+ for (butil::LinkNode<ButexWaiter>* p = waiters->head();
+ p != waiters->end(); p = p->next(), ++index) {
+ ButexWaiter* bw = p->value();
+ if (should_check(bw, index) && is_pinned_waiter(bw)) {
+ butex_strict_reject_count() << 1;
+ errno = EINVAL;
+ return true;
+ }
+ }
+ return false;
+}
+
+int butex_wake_to_task_group(void* arg, TaskGroup* target_group) {
+ if (arg == NULL || target_group == NULL) {
+ butex_within_invalid_count() << 1;
+ errno = EINVAL;
+ return -1;
+ }
+ Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex,
value);
+ ButexBthreadWaiter* bbw = NULL;
+ {
+ BAIDU_SCOPED_LOCK(b->waiter_lock);
+ if (b->waiters.empty()) {
+ butex_within_no_waiter_count() << 1;
+ return 0;
+ }
+ butil::LinkNode<ButexWaiter>* head = b->waiters.head();
+ if (head->next() != b->waiters.end()) {
+ butex_within_invalid_count() << 1;
+ errno = EINVAL;
+ return -1;
+ }
+ ButexWaiter* bw = head->value();
+ if (bw->tid == 0) {
+ butex_within_invalid_count() << 1;
+ errno = EINVAL;
+ return -1;
+ }
+ bbw = static_cast<ButexBthreadWaiter*>(bw);
+ if (bbw->home_group != target_group ||
+ bbw->control != target_group->control() ||
+ bbw->tag != target_group->tag()) {
+ butex_within_invalid_count() << 1;
+ errno = EINVAL;
+ return -1;
+ }
+ // wake_within() runs on target_group's owner worker. For pinned
waiters,
+ // if owner-local pinned runqueue is already full, report EAGAIN and
+ // keep waiter on butex list for next harvest retry, instead of
+ // blocking/spinning here.
+ if (is_pinned_waiter(bw) && target_group->pinned_rq_full()) {
+ errno = EAGAIN;
+ return -1;
+ }
+ bw->RemoveFromList();
+ bw->container.store(NULL, butil::memory_order_relaxed);
+ }
+
+ unsleep_if_necessary(bbw, get_global_timer_thread());
+ target_group->ready_to_run(bbw->task_meta, true);
+ return 1;
Review Comment:
in this code path, `pinned_rq_full()` and the following enqueue are executed
by the same owner worker, so the TOCTOU case you mentioned does not occur here.
`butex_wake_within_active_task()` is only allowed from the active-task hook
of the current `TaskGroup` (validated in `validate_active_task_hook_ctx`). For
pinned waiters, `ready_to_run()` routes to `ready_to_run_pinned_local()` on the
same home worker. Also, `_pinned_rq` is owner-local (not used by steal path,
and remote routing for pinned tasks goes to `_pinned_remote_rq`), so there is
no concurrent producer that can make `_pinned_rq` become full between the check
and enqueue.
so this check is intended as a fast fail (`EAGAIN`) when already full, not
as a cross-thread non-blocking CAS gate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]