This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 32e59ebc Fix butex wait_pthread handle EINTR (#2086)
32e59ebc is described below

commit 32e59ebcd9725a6d69416b4c519116404acbab9f
Author: Jenrry You <[email protected]>
AuthorDate: Fri May 5 14:47:19 2023 +0800

    Fix butex wait_pthread handle EINTR (#2086)
---
 src/bthread/butex.cpp           | 59 ++++++++++++++++++++---------------------
 test/bthread_butex_unittest.cpp | 49 ++++++++++++++++++++++++++++++++++
 2 files changed, 78 insertions(+), 30 deletions(-)

diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index 757b3143..19b03725 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -137,27 +137,41 @@ static void wakeup_pthread(ButexPthreadWaiter* pw) {
 
 bool erase_from_butex(ButexWaiter*, bool, WaiterState);
 
-int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
+int wait_pthread(ButexPthreadWaiter& pw, const timespec* abstime) {
+    timespec* ptimeout = NULL;
+    timespec timeout;
+    int64_t timeout_us = 0;
+    int rc;
+
     while (true) {
-        const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, 
ptimeout);
-        if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) 
{
-            // If `sig' is changed, wakeup_pthread() must be called and `pw'
-            // is already removed from the butex.
-            // Acquire fence makes this thread sees changes before wakeup.
-            return rc;
+        if (abstime != NULL) {
+            timeout_us = butil::timespec_to_microseconds(*abstime) - 
butil::gettimeofday_us();
+            timeout = butil::microseconds_to_timespec(timeout_us);
+            ptimeout = &timeout;
+        }
+        if (timeout_us > MIN_SLEEP_US || abstime == NULL) {
+            rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
+            if (PTHREAD_NOT_SIGNALLED != 
pw.sig.load(butil::memory_order_acquire)) {
+                // If `sig' is changed, wakeup_pthread() must be called and 
`pw'
+                // is already removed from the butex.
+                // Acquire fence makes this thread sees changes before wakeup.
+                return rc;
+            }
+        } else {
+            errno = ETIMEDOUT;
+            rc = -1;
         }
+        // Handle ETIMEDOUT when abstime is valid.
+        // If futex_wait_private return EINTR, just continue the loop.
         if (rc != 0 && errno == ETIMEDOUT) {
-            // Note that we don't handle the EINTR from futex_wait here since
-            // pthreads waiting on a butex should behave similarly as bthreads
-            // which are not able to be woken-up by signals.
-            // EINTR on butex is only producible by TaskGroup::interrupt().
-
-            // `pw' is still in the queue, remove it.
+            // wait futex timeout, `pw' is still in the queue, remove it.
             if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) {
                 // Another thread is erasing `pw' as well, wait for the signal.
                 // Acquire fence makes this thread sees changes before wakeup.
                 if (pw.sig.load(butil::memory_order_acquire) == 
PTHREAD_NOT_SIGNALLED) {
-                    ptimeout = NULL; // already timedout, ptimeout is expired.
+                    // already timedout, abstime and ptimeout are expired.
+                    abstime = NULL;
+                    ptimeout = NULL;
                     continue;
                 }
             }
@@ -567,21 +581,6 @@ static void wait_for_butex(void* arg) {
 
 static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
                                    const timespec* abstime) {
-    // sys futex needs relative timeout.
-    // Compute diff between abstime and now.
-    timespec* ptimeout = NULL;
-    timespec timeout;
-    if (abstime != NULL) {
-        const int64_t timeout_us = butil::timespec_to_microseconds(*abstime) -
-            butil::gettimeofday_us();
-        if (timeout_us < MIN_SLEEP_US) {
-            errno = ETIMEDOUT;
-            return -1;
-        }
-        timeout = butil::microseconds_to_timespec(timeout_us);
-        ptimeout = &timeout;
-    }
-
     TaskMeta* task = NULL;
     ButexPthreadWaiter pw;
     pw.tid = 0;
@@ -612,7 +611,7 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, 
int expected_value,
         bvar::Adder<int64_t>& num_waiters = butex_waiter_count();
         num_waiters << 1;
 #endif
-        rc = wait_pthread(pw, ptimeout);
+        rc = wait_pthread(pw, abstime);
 #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
         num_waiters << -1;
 #endif
diff --git a/test/bthread_butex_unittest.cpp b/test/bthread_butex_unittest.cpp
index eb991fd0..8f2f4f5f 100644
--- a/test/bthread_butex_unittest.cpp
+++ b/test/bthread_butex_unittest.cpp
@@ -25,6 +25,7 @@
 #include "bthread/task_group.h"
 #include "bthread/bthread.h"
 #include "bthread/unstable.h"
+#include "bthread/interrupt_pthread.h"
 
 namespace bthread {
 extern butil::atomic<TaskControl*> g_task_control;
@@ -394,4 +395,52 @@ TEST(ButexTest, stop_before_sleeping) {
         ASSERT_EQ(EINVAL, bthread_stop(th));
     }
 }
+
+void* trigger_signal(void* arg) {
+    pthread_t * th = (pthread_t*)arg;
+    const long t1 = butil::gettimeofday_us();
+    for (size_t i = 0; i < 50; ++i) {
+      usleep(100000);
+      if (bthread::interrupt_pthread(*th) == ESRCH) {
+        LOG(INFO) << "waiter thread end, trigger count=" << i;
+        break;
+      }
+    }
+    const long t2 = butil::gettimeofday_us();
+    LOG(INFO) << "trigger signal thread end, elapsed=" << (t2-t1) << "us";
+    return NULL;
+}
+
+TEST(ButexTest, wait_with_signal_triggered) {
+    butil::Timer tm;
+
+    const int64_t WAIT_MSEC = 500;
+    WaiterArg waiter_args;
+    pthread_t waiter_th, tigger_th;
+    butil::atomic<int>* butex =
+        bthread::butex_create_checked<butil::atomic<int> >();
+    ASSERT_TRUE(butex);
+    *butex = 1;
+    ASSERT_EQ(0, bthread::butex_wake(butex));
+
+    const timespec abstime = butil::milliseconds_from_now(WAIT_MSEC);
+    waiter_args.expected_value = *butex;
+    waiter_args.butex = butex;
+    waiter_args.expected_result = ETIMEDOUT;
+    waiter_args.ptimeout = &abstime;
+    tm.start();
+    pthread_create(&waiter_th, NULL, waiter, &waiter_args);
+    pthread_create(&tigger_th, NULL, trigger_signal, &waiter_th);
+    
+    ASSERT_EQ(0, pthread_join(waiter_th, NULL));
+    tm.stop();
+    auto wait_elapsed_ms = tm.m_elapsed();;
+    LOG(INFO) << "waiter thread end, elapsed " << wait_elapsed_ms << " ms";
+
+    ASSERT_LT(labs(wait_elapsed_ms - WAIT_MSEC), 250);
+
+    ASSERT_EQ(0, pthread_join(tigger_th, NULL));
+    bthread::butex_destroy(butex);
+}
+
 } // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to