This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 341ad4d6 butex_wake* support nosignal flag, use bthread_flush signal 
batch (#1751)
341ad4d6 is described below

commit 341ad4d67606e57cd24260796a32dd72a1896ce2
Author: Yang,Liming <[email protected]>
AuthorDate: Mon Jun 6 15:33:10 2022 +0800

    butex_wake* support nosignal flag, use bthread_flush signal batch (#1751)
---
 src/bthread/butex.cpp           | 44 +++++++++++++++++++++++++++++------------
 src/bthread/butex.h             |  4 ++--
 src/bthread/countdown_event.cpp |  4 ++--
 src/bthread/countdown_event.h   |  3 ++-
 4 files changed, 37 insertions(+), 18 deletions(-)

diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index f8d947d0..cc43702a 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -166,6 +166,7 @@ int wait_pthread(ButexPthreadWaiter& pw, timespec* 
ptimeout) {
 }
 
 extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
+extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group_nosignal;
 
 // Returns 0 when no need to unschedule or successfully unscheduled,
 // -1 otherwise.
@@ -256,12 +257,29 @@ void butex_destroy(void* butex) {
     butil::return_object(b);
 }
 
-inline TaskGroup* get_task_group(TaskControl* c) {
-    TaskGroup* g = tls_task_group;
-    return g ? g : c->choose_one_group();
+inline TaskGroup* get_task_group(TaskControl* c, bool nosignal = false) {
+    TaskGroup* g;
+    if (nosignal) {
+        g = tls_task_group_nosignal;
+        if (NULL == g) {
+            g = c->choose_one_group();
+            tls_task_group_nosignal = g;
+        }
+    } else {
+        g = tls_task_group ? tls_task_group : c->choose_one_group();
+    }
+    return g;
+}
+
+inline void run_in_local_task_group(TaskGroup* g, bthread_t tid, bool 
nosignal) {
+    if (!nosignal) {
+        TaskGroup::exchange(&g, tid);
+    } else {
+        g->ready_to_run(tid, nosignal);
+    }
 }
 
-int butex_wake(void* arg) {
+int butex_wake(void* arg, bool nosignal) {
     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, 
value);
     ButexWaiter* front = NULL;
     {
@@ -279,16 +297,16 @@ int butex_wake(void* arg) {
     }
     ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
     unsleep_if_necessary(bbw, get_global_timer_thread());
-    TaskGroup* g = tls_task_group;
-    if (g) {
-        TaskGroup::exchange(&g, bbw->tid);
+    TaskGroup* g = get_task_group(bbw->control, nosignal);
+    if (g == tls_task_group) {
+        run_in_local_task_group(g, bbw->tid, nosignal);
     } else {
-        bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
+        g->ready_to_run_remote(bbw->tid, nosignal);
     }
     return 1;
 }
 
-int butex_wake_all(void* arg) {
+int butex_wake_all(void* arg, bool nosignal) {
     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, 
value);
 
     ButexWaiterList bthread_waiters;
@@ -324,7 +342,7 @@ int butex_wake_all(void* arg) {
     next->RemoveFromList();
     unsleep_if_necessary(next, get_global_timer_thread());
     ++nwakeup;
-    TaskGroup* g = get_task_group(next->control);
+    TaskGroup* g = get_task_group(next->control, nosignal);
     const int saved_nwakeup = nwakeup;
     while (!bthread_waiters.empty()) {
         // pop reversely
@@ -335,13 +353,13 @@ int butex_wake_all(void* arg) {
         g->ready_to_run_general(w->tid, true);
         ++nwakeup;
     }
-    if (saved_nwakeup != nwakeup) {
+    if (!nosignal && saved_nwakeup != nwakeup) {
         g->flush_nosignal_tasks_general();
     }
     if (g == tls_task_group) {
-        TaskGroup::exchange(&g, next->tid);
+        run_in_local_task_group(g, next->tid, nosignal);
     } else {
-        g->ready_to_run_remote(next->tid);
+        g->ready_to_run_remote(next->tid, nosignal);
     }
     return nwakeup;
 }
diff --git a/src/bthread/butex.h b/src/bthread/butex.h
index 4e650914..db4ec341 100644
--- a/src/bthread/butex.h
+++ b/src/bthread/butex.h
@@ -46,11 +46,11 @@ void butex_destroy(void* butex);
 
 // Wake up at most 1 thread waiting on |butex|.
 // Returns # of threads woken up.
-int butex_wake(void* butex);
+int butex_wake(void* butex, bool nosignal = false);
 
 // Wake up all threads waiting on |butex|.
 // Returns # of threads woken up.
-int butex_wake_all(void* butex);
+int butex_wake_all(void* butex, bool nosignal = false);
 
 // Wake up all threads waiting on |butex| except a bthread whose identifier
 // is |excluded_bthread|. This function does not yield.
diff --git a/src/bthread/countdown_event.cpp b/src/bthread/countdown_event.cpp
index 3d0d652d..0d9a93fe 100644
--- a/src/bthread/countdown_event.cpp
+++ b/src/bthread/countdown_event.cpp
@@ -39,7 +39,7 @@ CountdownEvent::~CountdownEvent() {
     butex_destroy(_butex);
 }
 
-void CountdownEvent::signal(int sig) {
+void CountdownEvent::signal(int sig, bool flush) {
     // Have to save _butex, *this is probably defreferenced by the wait thread
     // which sees fetch_sub
     void* const saved_butex = _butex;
@@ -50,7 +50,7 @@ void CountdownEvent::signal(int sig) {
         return;
     }
     LOG_IF(ERROR, prev < sig) << "Counter is over decreased";
-    butex_wake_all(saved_butex);
+    butex_wake_all(saved_butex, flush);
 }
 
 int CountdownEvent::wait() {
diff --git a/src/bthread/countdown_event.h b/src/bthread/countdown_event.h
index d1ee669d..acb205c6 100644
--- a/src/bthread/countdown_event.h
+++ b/src/bthread/countdown_event.h
@@ -39,7 +39,8 @@ public:
     void reset(int v = 1);
 
     // Decrease the counter by |sig|
-    void signal(int sig = 1);
+    // when flush is true, after signal we need to call bthread_flush
+    void signal(int sig = 1, bool flush = false);
 
     // Block current thread until the counter reaches 0.
     // Returns 0 on success, error code otherwise.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to