This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 32e59ebc Fix butex wait_pthread handle EINTR (#2086)
32e59ebc is described below
commit 32e59ebcd9725a6d69416b4c519116404acbab9f
Author: Jenrry You <[email protected]>
AuthorDate: Fri May 5 14:47:19 2023 +0800
Fix butex wait_pthread handle EINTR (#2086)
---
src/bthread/butex.cpp | 59 ++++++++++++++++++++---------------------
test/bthread_butex_unittest.cpp | 49 ++++++++++++++++++++++++++++++++++
2 files changed, 78 insertions(+), 30 deletions(-)
diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index 757b3143..19b03725 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -137,27 +137,41 @@ static void wakeup_pthread(ButexPthreadWaiter* pw) {
bool erase_from_butex(ButexWaiter*, bool, WaiterState);
-int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
+int wait_pthread(ButexPthreadWaiter& pw, const timespec* abstime) {
+ timespec* ptimeout = NULL;
+ timespec timeout;
+ int64_t timeout_us = 0;
+ int rc;
+
while (true) {
- const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED,
ptimeout);
- if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire))
{
- // If `sig' is changed, wakeup_pthread() must be called and `pw'
- // is already removed from the butex.
- // Acquire fence makes this thread sees changes before wakeup.
- return rc;
+ if (abstime != NULL) {
+ timeout_us = butil::timespec_to_microseconds(*abstime) -
butil::gettimeofday_us();
+ timeout = butil::microseconds_to_timespec(timeout_us);
+ ptimeout = &timeout;
+ }
+ if (timeout_us > MIN_SLEEP_US || abstime == NULL) {
+ rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
+ if (PTHREAD_NOT_SIGNALLED !=
pw.sig.load(butil::memory_order_acquire)) {
+ // If `sig' is changed, wakeup_pthread() must be called and
`pw'
+ // is already removed from the butex.
+ // Acquire fence makes this thread sees changes before wakeup.
+ return rc;
+ }
+ } else {
+ errno = ETIMEDOUT;
+ rc = -1;
}
+ // Handle ETIMEDOUT when abstime is valid.
+ // If futex_wait_private return EINTR, just continue the loop.
if (rc != 0 && errno == ETIMEDOUT) {
- // Note that we don't handle the EINTR from futex_wait here since
- // pthreads waiting on a butex should behave similarly as bthreads
- // which are not able to be woken-up by signals.
- // EINTR on butex is only producible by TaskGroup::interrupt().
-
- // `pw' is still in the queue, remove it.
+ // wait futex timeout, `pw' is still in the queue, remove it.
if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) {
// Another thread is erasing `pw' as well, wait for the signal.
// Acquire fence makes this thread sees changes before wakeup.
if (pw.sig.load(butil::memory_order_acquire) ==
PTHREAD_NOT_SIGNALLED) {
- ptimeout = NULL; // already timedout, ptimeout is expired.
+ // already timedout, abstime and ptimeout are expired.
+ abstime = NULL;
+ ptimeout = NULL;
continue;
}
}
@@ -567,21 +581,6 @@ static void wait_for_butex(void* arg) {
static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
const timespec* abstime) {
- // sys futex needs relative timeout.
- // Compute diff between abstime and now.
- timespec* ptimeout = NULL;
- timespec timeout;
- if (abstime != NULL) {
- const int64_t timeout_us = butil::timespec_to_microseconds(*abstime) -
- butil::gettimeofday_us();
- if (timeout_us < MIN_SLEEP_US) {
- errno = ETIMEDOUT;
- return -1;
- }
- timeout = butil::microseconds_to_timespec(timeout_us);
- ptimeout = &timeout;
- }
-
TaskMeta* task = NULL;
ButexPthreadWaiter pw;
pw.tid = 0;
@@ -612,7 +611,7 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b,
int expected_value,
bvar::Adder<int64_t>& num_waiters = butex_waiter_count();
num_waiters << 1;
#endif
- rc = wait_pthread(pw, ptimeout);
+ rc = wait_pthread(pw, abstime);
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
num_waiters << -1;
#endif
diff --git a/test/bthread_butex_unittest.cpp b/test/bthread_butex_unittest.cpp
index eb991fd0..8f2f4f5f 100644
--- a/test/bthread_butex_unittest.cpp
+++ b/test/bthread_butex_unittest.cpp
@@ -25,6 +25,7 @@
#include "bthread/task_group.h"
#include "bthread/bthread.h"
#include "bthread/unstable.h"
+#include "bthread/interrupt_pthread.h"
namespace bthread {
extern butil::atomic<TaskControl*> g_task_control;
@@ -394,4 +395,52 @@ TEST(ButexTest, stop_before_sleeping) {
ASSERT_EQ(EINVAL, bthread_stop(th));
}
}
+
+void* trigger_signal(void* arg) {
+ pthread_t * th = (pthread_t*)arg;
+ const long t1 = butil::gettimeofday_us();
+ for (size_t i = 0; i < 50; ++i) {
+ usleep(100000);
+ if (bthread::interrupt_pthread(*th) == ESRCH) {
+ LOG(INFO) << "waiter thread end, trigger count=" << i;
+ break;
+ }
+ }
+ const long t2 = butil::gettimeofday_us();
+ LOG(INFO) << "trigger signal thread end, elapsed=" << (t2-t1) << "us";
+ return NULL;
+}
+
+TEST(ButexTest, wait_with_signal_triggered) {
+ butil::Timer tm;
+
+ const int64_t WAIT_MSEC = 500;
+ WaiterArg waiter_args;
+ pthread_t waiter_th, tigger_th;
+ butil::atomic<int>* butex =
+ bthread::butex_create_checked<butil::atomic<int> >();
+ ASSERT_TRUE(butex);
+ *butex = 1;
+ ASSERT_EQ(0, bthread::butex_wake(butex));
+
+ const timespec abstime = butil::milliseconds_from_now(WAIT_MSEC);
+ waiter_args.expected_value = *butex;
+ waiter_args.butex = butex;
+ waiter_args.expected_result = ETIMEDOUT;
+ waiter_args.ptimeout = &abstime;
+ tm.start();
+ pthread_create(&waiter_th, NULL, waiter, &waiter_args);
+ pthread_create(&tigger_th, NULL, trigger_signal, &waiter_th);
+
+ ASSERT_EQ(0, pthread_join(waiter_th, NULL));
+ tm.stop();
+ auto wait_elapsed_ms = tm.m_elapsed();;
+ LOG(INFO) << "waiter thread end, elapsed " << wait_elapsed_ms << " ms";
+
+ ASSERT_LT(labs(wait_elapsed_ms - WAIT_MSEC), 250);
+
+ ASSERT_EQ(0, pthread_join(tigger_th, NULL));
+ bthread::butex_destroy(butex);
+}
+
} // namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]