This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 341ad4d6 butex_wake* support nosignal flag, use bthread_flush signal
batch (#1751)
341ad4d6 is described below
commit 341ad4d67606e57cd24260796a32dd72a1896ce2
Author: Yang,Liming <[email protected]>
AuthorDate: Mon Jun 6 15:33:10 2022 +0800
butex_wake* support nosignal flag, use bthread_flush signal batch (#1751)
---
src/bthread/butex.cpp | 44 +++++++++++++++++++++++++++++------------
src/bthread/butex.h | 4 ++--
src/bthread/countdown_event.cpp | 4 ++--
src/bthread/countdown_event.h | 3 ++-
4 files changed, 37 insertions(+), 18 deletions(-)
diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index f8d947d0..cc43702a 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -166,6 +166,7 @@ int wait_pthread(ButexPthreadWaiter& pw, timespec*
ptimeout) {
}
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
+extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group_nosignal;
// Returns 0 when no need to unschedule or successfully unscheduled,
// -1 otherwise.
@@ -256,12 +257,29 @@ void butex_destroy(void* butex) {
butil::return_object(b);
}
-inline TaskGroup* get_task_group(TaskControl* c) {
- TaskGroup* g = tls_task_group;
- return g ? g : c->choose_one_group();
+inline TaskGroup* get_task_group(TaskControl* c, bool nosignal = false) {
+ TaskGroup* g;
+ if (nosignal) {
+ g = tls_task_group_nosignal;
+ if (NULL == g) {
+ g = c->choose_one_group();
+ tls_task_group_nosignal = g;
+ }
+ } else {
+ g = tls_task_group ? tls_task_group : c->choose_one_group();
+ }
+ return g;
+}
+
+inline void run_in_local_task_group(TaskGroup* g, bthread_t tid, bool
nosignal) {
+ if (!nosignal) {
+ TaskGroup::exchange(&g, tid);
+ } else {
+ g->ready_to_run(tid, nosignal);
+ }
}
-int butex_wake(void* arg) {
+int butex_wake(void* arg, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex,
value);
ButexWaiter* front = NULL;
{
@@ -279,16 +297,16 @@ int butex_wake(void* arg) {
}
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
- TaskGroup* g = tls_task_group;
- if (g) {
- TaskGroup::exchange(&g, bbw->tid);
+ TaskGroup* g = get_task_group(bbw->control, nosignal);
+ if (g == tls_task_group) {
+ run_in_local_task_group(g, bbw->tid, nosignal);
} else {
- bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
+ g->ready_to_run_remote(bbw->tid, nosignal);
}
return 1;
}
-int butex_wake_all(void* arg) {
+int butex_wake_all(void* arg, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex,
value);
ButexWaiterList bthread_waiters;
@@ -324,7 +342,7 @@ int butex_wake_all(void* arg) {
next->RemoveFromList();
unsleep_if_necessary(next, get_global_timer_thread());
++nwakeup;
- TaskGroup* g = get_task_group(next->control);
+ TaskGroup* g = get_task_group(next->control, nosignal);
const int saved_nwakeup = nwakeup;
while (!bthread_waiters.empty()) {
// pop reversely
@@ -335,13 +353,13 @@ int butex_wake_all(void* arg) {
g->ready_to_run_general(w->tid, true);
++nwakeup;
}
- if (saved_nwakeup != nwakeup) {
+ if (!nosignal && saved_nwakeup != nwakeup) {
g->flush_nosignal_tasks_general();
}
if (g == tls_task_group) {
- TaskGroup::exchange(&g, next->tid);
+ run_in_local_task_group(g, next->tid, nosignal);
} else {
- g->ready_to_run_remote(next->tid);
+ g->ready_to_run_remote(next->tid, nosignal);
}
return nwakeup;
}
diff --git a/src/bthread/butex.h b/src/bthread/butex.h
index 4e650914..db4ec341 100644
--- a/src/bthread/butex.h
+++ b/src/bthread/butex.h
@@ -46,11 +46,11 @@ void butex_destroy(void* butex);
// Wake up at most 1 thread waiting on |butex|.
// Returns # of threads woken up.
-int butex_wake(void* butex);
+int butex_wake(void* butex, bool nosignal = false);
// Wake up all threads waiting on |butex|.
// Returns # of threads woken up.
-int butex_wake_all(void* butex);
+int butex_wake_all(void* butex, bool nosignal = false);
// Wake up all threads waiting on |butex| except a bthread whose identifier
// is |excluded_bthread|. This function does not yield.
diff --git a/src/bthread/countdown_event.cpp b/src/bthread/countdown_event.cpp
index 3d0d652d..0d9a93fe 100644
--- a/src/bthread/countdown_event.cpp
+++ b/src/bthread/countdown_event.cpp
@@ -39,7 +39,7 @@ CountdownEvent::~CountdownEvent() {
butex_destroy(_butex);
}
-void CountdownEvent::signal(int sig) {
+void CountdownEvent::signal(int sig, bool flush) {
// Have to save _butex, *this is probably defreferenced by the wait thread
// which sees fetch_sub
void* const saved_butex = _butex;
@@ -50,7 +50,7 @@ void CountdownEvent::signal(int sig) {
return;
}
LOG_IF(ERROR, prev < sig) << "Counter is over decreased";
- butex_wake_all(saved_butex);
+ butex_wake_all(saved_butex, flush);
}
int CountdownEvent::wait() {
diff --git a/src/bthread/countdown_event.h b/src/bthread/countdown_event.h
index d1ee669d..acb205c6 100644
--- a/src/bthread/countdown_event.h
+++ b/src/bthread/countdown_event.h
@@ -39,7 +39,8 @@ public:
void reset(int v = 1);
// Decrease the counter by |sig|
- void signal(int sig = 1);
+ // when flush is true, after signal we need to call bthread_flush
+ void signal(int sig = 1, bool flush = false);
// Block current thread until the counter reaches 0.
// Returns 0 on success, error code otherwise.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]