This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 07083331 Bugfix: TaskTracer deadlocks due to ABA problem (#3115)
07083331 is described below
commit 07083331e1496746f4b1cfa54d0b6364f321be34
Author: Bright Chen <[email protected]>
AuthorDate: Tue Oct 14 14:06:56 2025 +0800
Bugfix: TaskTracer deadlocks due to ABA problem (#3115)
---
src/bthread/task_meta.h | 4 ++
src/bthread/task_tracer.cpp | 92 ++++++++++++--------------------
src/bthread/task_tracer.h | 8 +--
test/brpc_http_rpc_protocol_unittest.cpp | 4 +-
4 files changed, 41 insertions(+), 67 deletions(-)
diff --git a/src/bthread/task_meta.h b/src/bthread/task_meta.h
index a4ed42bf..1b77c0b6 100644
--- a/src/bthread/task_meta.h
+++ b/src/bthread/task_meta.h
@@ -112,6 +112,8 @@ struct TaskMeta {
TaskStatus status{TASK_STATUS_UNKNOWN};
// Whether bthread is traced?
bool traced{false};
+ // [Not Reset] guarantee tracing completion before jumping.
+ pthread_mutex_t trace_lock{};
// Worker thread id.
pthread_t worker_tid{};
@@ -122,9 +124,11 @@ public:
pthread_spin_init(&version_lock, 0);
version_butex = butex_create_checked<uint32_t>();
*version_butex = 1;
+ pthread_mutex_init(&trace_lock, NULL);
}
~TaskMeta() {
+ pthread_mutex_destroy(&trace_lock);
butex_destroy(version_butex);
version_butex = NULL;
pthread_spin_destroy(&version_lock);
diff --git a/src/bthread/task_tracer.cpp b/src/bthread/task_tracer.cpp
index 415670aa..2602fe31 100644
--- a/src/bthread/task_tracer.cpp
+++ b/src/bthread/task_tracer.cpp
@@ -119,6 +119,7 @@ void TaskTracer::Result::OutputToStream(std::ostream& os)
const {
bool TaskTracer::Init() {
if (_trace_time.expose("bthread_trace_time") != 0) {
+ LOG(ERROR) << "Fail to expose bthread_trace_time";
return false;
}
if (!RegisterSignalHandler()) {
@@ -136,7 +137,7 @@ void TaskTracer::set_status(TaskStatus s, TaskMeta* m) {
CHECK_NE(TASK_STATUS_RUNNING, s) << "Use `set_running_status' instead";
CHECK_NE(TASK_STATUS_END, s) << "Use `set_end_status_unsafe' instead";
- bool tracing;
+ bool tracing = false;
{
BAIDU_SCOPED_LOCK(m->version_lock);
if (TASK_STATUS_UNKNOWN == m->status && TASK_STATUS_JUMPING == s) {
@@ -182,31 +183,8 @@ void TaskTracer::Trace(std::ostream& os, bthread_t tid) {
}
void TaskTracer::WaitForTracing(TaskMeta* m) {
- BAIDU_SCOPED_LOCK(_mutex);
- while (m->traced) {
- _cond.Wait();
- }
-}
-
-TaskStatus TaskTracer::WaitForJumping(TaskMeta* m) {
- // Reasons for not using locks here:
- // 1. It is necessary to lock before jump_stack, unlock after jump_stack,
- // which involves two different bthread and is prone to errors.
- // 2. jump_stack is fast.
- int i = 0;
- do {
- // The bthread is jumping now, spin until it finishes.
- if (i++ < 30) {
- cpu_relax();
- } else {
- sched_yield();
- }
-
- BAIDU_SCOPED_LOCK(m->version_lock);
- if (TASK_STATUS_JUMPING != m->status) {
- return m->status;
- }
- } while (true);
+ BAIDU_SCOPED_LOCK(m->trace_lock);
+ // Acquiring trace_lock means tracing is done.
}
TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
@@ -224,25 +202,44 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
// Make sure only one bthread is traced at a time.
BAIDU_SCOPED_LOCK(_trace_request_mutex);
+ // The chance to remove unused SignalSyncs.
+ auto iter = std::remove_if(
+ _inuse_signal_syncs.begin(), _inuse_signal_syncs.end(),
+ [](butil::intrusive_ptr<SignalSync>& sync) {
+ return sync->ref_count() == 1;
+ });
+ _inuse_signal_syncs.erase(iter, _inuse_signal_syncs.end());
+
TaskMeta* m = TaskGroup::address_meta(tid);
if (NULL == m) {
return Result::MakeErrorResult("bthread=%d never existed", tid);
}
- BAIDU_SCOPED_LOCK(_mutex);
+ BAIDU_SCOPED_LOCK(m->trace_lock);
TaskStatus status;
pthread_t worker_tid;
const uint32_t given_version = get_version(tid);
{
BAIDU_SCOPED_LOCK(m->version_lock);
- if (given_version == *m->version_butex) {
- // Start tracing.
- m->traced = true;
- worker_tid = m->worker_tid;
- status = m->status;
- } else {
+ if (given_version != *m->version_butex) {
return Result::MakeErrorResult("bthread=%d not exist now", tid);
}
+
+ status = m->status;
+ if (TASK_STATUS_UNKNOWN == status) {
+ return Result::MakeErrorResult("bthread=%d not exist now", tid);
+ } else if (TASK_STATUS_CREATED == status) {
+ return Result::MakeErrorResult("bthread=%d has just been created",
tid);
+ } else if (TASK_STATUS_FIRST_READY == status) {
+ return Result::MakeErrorResult("bthread=%d is scheduled for the
first time", tid);
+ } else if (TASK_STATUS_END == status) {
+ return Result::MakeErrorResult("bthread=%d has ended", tid);
+ } else if (TASK_STATUS_JUMPING == status) {
+ return Result::MakeErrorResult("bthread=%d is jumping stack", tid);
+ }
+ // Start tracing.
+ m->traced = true;
+ worker_tid = m->worker_tid;
}
BRPC_SCOPE_EXIT {
@@ -252,31 +249,16 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
// tracing completion, so given_version != *m->version_butex is OK.
m->traced = false;
}
- // Wake up the waiting worker thread to jump.
- _cond.Signal();
};
- if (TASK_STATUS_UNKNOWN == status) {
- return Result::MakeErrorResult("bthread=%d not exist now", tid);
- } else if (TASK_STATUS_CREATED == status) {
- return Result::MakeErrorResult("bthread=%d has just been created",
tid);
- } else if (TASK_STATUS_FIRST_READY == status) {
- return Result::MakeErrorResult("bthread=%d is scheduled for the first
time", tid);
- } else if (TASK_STATUS_END == status) {
- return Result::MakeErrorResult("bthread=%d has ended", tid);
- } else if (TASK_STATUS_JUMPING == status) {
- // Wait for jumping completion.
- status = WaitForJumping(m);
- }
-
- // After jumping, the status may be RUNNING, SUSPENDED, or READY, which is
traceable.
+ // The status may be RUNNING, SUSPENDED, or READY, which is traceable.
if (TASK_STATUS_RUNNING == status) {
return SignalTrace(worker_tid);
} else if (TASK_STATUS_SUSPENDED == status || TASK_STATUS_READY == status)
{
return ContextTrace(m->stack->context);
}
- return Result::MakeErrorResult("Invalid TaskStatus=%d", status);
+ return Result::MakeErrorResult("Invalid TaskStatus=%d of bthread=%d",
status, tid);
}
// Instruct ASan to ignore this function.
@@ -408,14 +390,6 @@ TaskTracer::Result TaskTracer::SignalTrace(pthread_t
worker_tid) {
return Result::MakeErrorResult("Forbid to trace self");
}
- // Remove unused SignalSyncs.
- auto iter = std::remove_if(
- _inuse_signal_syncs.begin(), _inuse_signal_syncs.end(),
- [](butil::intrusive_ptr<SignalSync>& sync) {
- return sync->ref_count() == 1;
- });
- _inuse_signal_syncs.erase(iter, _inuse_signal_syncs.end());
-
// Each signal trace has an independent SignalSync to
// prevent the previous SignalHandler from affecting the new SignalTrace.
butil::intrusive_ptr<SignalSync> signal_sync(new SignalSync());
@@ -465,6 +439,8 @@ TaskTracer::Result TaskTracer::SignalTrace(pthread_t
worker_tid) {
}
break;
}
+ // Remove the successful SignalSync.
+ _inuse_signal_syncs.pop_back();
return signal_sync->result;
}
diff --git a/src/bthread/task_tracer.h b/src/bthread/task_tracer.h
index 0888c658..be95f3ac 100644
--- a/src/bthread/task_tracer.h
+++ b/src/bthread/task_tracer.h
@@ -47,7 +47,7 @@ public:
void Trace(std::ostream& os, bthread_t tid);
// When the worker is jumping stack from a bthread to another,
- void WaitForTracing(TaskMeta* m);
+ static void WaitForTracing(TaskMeta* m);
private:
// Error number guard used in signal handler.
@@ -94,7 +94,6 @@ private:
Result result;
};
- static TaskStatus WaitForJumping(TaskMeta* m);
Result TraceImpl(bthread_t tid);
unw_cursor_t MakeCursor(bthread_fcontext_t fcontext);
@@ -108,11 +107,6 @@ private:
// Make sure only one bthread is traced at a time.
Mutex _trace_request_mutex;
- // For signal trace.
- // Make sure bthread does not jump stack when it is being traced.
- butil::Mutex _mutex;
- butil::ConditionVariable _cond{&_mutex};
-
// For context trace.
unw_context_t _context{};
diff --git a/test/brpc_http_rpc_protocol_unittest.cpp
b/test/brpc_http_rpc_protocol_unittest.cpp
index 5a6839c8..f13c6877 100644
--- a/test/brpc_http_rpc_protocol_unittest.cpp
+++ b/test/brpc_http_rpc_protocol_unittest.cpp
@@ -1701,9 +1701,9 @@ TEST_F(HttpTest, spring_protobuf_content_type) {
res.Clear();
cntl2.http_request().set_content_type("application/x-protobuf");
stub.Echo(&cntl2, &req, &res, nullptr);
- ASSERT_FALSE(cntl.Failed());
+ ASSERT_FALSE(cntl2.Failed());
ASSERT_EQ(EXP_RESPONSE, res.message());
- ASSERT_EQ("application/x-protobuf", cntl.http_response().content_type());
+ ASSERT_EQ("application/x-protobuf", cntl2.http_response().content_type());
}
TEST_F(HttpTest, dump_http_request) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]