This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 98261cea376 IGNITE-28410 DB API Driver: Fix deadlock in native code
(#7909)
98261cea376 is described below
commit 98261cea376e578cfa94a63a250cdce9a40a0401
Author: Dmitriy Zabotlin <[email protected]>
AuthorDate: Tue Mar 31 14:48:57 2026 +0300
IGNITE-28410 DB API Driver: Fix deadlock in native code (#7909)
Co-authored-by: dzabotlin <[email protected]>
---
.../cpp/ignite/common/detail/thread_timer.cpp | 49 ++++++++++++++--------
.../cpp/ignite/common/detail/thread_timer.h | 33 ++++++++++-----
2 files changed, 53 insertions(+), 29 deletions(-)
diff --git a/modules/platforms/cpp/ignite/common/detail/thread_timer.cpp
b/modules/platforms/cpp/ignite/common/detail/thread_timer.cpp
index 73fb2716072..f303f860a36 100644
--- a/modules/platforms/cpp/ignite/common/detail/thread_timer.cpp
+++ b/modules/platforms/cpp/ignite/common/detail/thread_timer.cpp
@@ -27,27 +27,31 @@ thread_timer::~thread_timer() {
std::shared_ptr<thread_timer>
thread_timer::start(std::function<void(ignite_error&&)> error_handler) {
std::shared_ptr<thread_timer> res{new thread_timer()};
- res->m_thread = std::thread([&self = *res, error_handler =
std::move(error_handler)]() {
- std::unique_lock<std::mutex> lock(self.m_mutex);
+ res->m_thread = std::thread([state = res->m_state, error_handler =
std::move(error_handler)]() {
+ std::unique_lock<std::mutex> lock(state->m_mutex);
while (true) {
- if (self.m_stopping) {
- self.m_condition.notify_one();
+ if (state->m_stopping) {
+ state->m_condition.notify_one();
return;
}
- if (self.m_events.empty()) {
- self.m_condition.wait(lock);
+ if (state->m_events.empty()) {
+ state->m_condition.wait(lock);
continue;
}
- auto nearest_event_ts = self.m_events.top().timestamp;
+ auto nearest_event_ts = state->m_events.top().timestamp;
auto now = std::chrono::steady_clock::now();
if (nearest_event_ts < now) {
- auto func = self.m_events.top().callback;
- self.m_events.pop();
+ auto func = state->m_events.top().callback;
+ state->m_events.pop();
lock.unlock();
+ // NOTE: invoking func may destroy the thread_timer object
(e.g. when the last
+ // shared_ptr<node_connection> held by the callback is
released, triggering
+ // ~node_connection -> ~thread_timer -> stop()). The
timer_state shared_ptr
+ // captured by this lambda keeps state alive across that
destruction.
auto res = result_of_operation(func);
if (res.has_error()) {
error_handler(res.error());
@@ -55,7 +59,7 @@ std::shared_ptr<thread_timer>
thread_timer::start(std::function<void(ignite_erro
lock.lock();
} else {
- self.m_condition.wait_until(lock, nearest_event_ts);
+ state->m_condition.wait_until(lock, nearest_event_ts);
}
}
});
@@ -64,20 +68,29 @@ std::shared_ptr<thread_timer>
thread_timer::start(std::function<void(ignite_erro
void thread_timer::stop() {
{
- std::unique_lock<std::mutex> lock(m_mutex);
- if (m_stopping)
+ std::unique_lock<std::mutex> lock(m_state->m_mutex);
+ if (m_state->m_stopping)
return;
- m_stopping = true;
- m_condition.notify_one();
+ m_state->m_stopping = true;
+ m_state->m_condition.notify_one();
+ }
+
+ if (std::this_thread::get_id() == m_thread.get_id()) {
+ // Called from within a timer callback. Joining the current thread
would deadlock, so
+ // detach instead. The timer loop will see m_stopping == true on its
next iteration and
+ // exit cleanly. The timer_state shared_ptr held by the thread lambda
keeps the state
+ // (mutex, condition variable, event queue) alive until the thread
actually terminates.
+ m_thread.detach();
+ } else {
+ m_thread.join();
}
- m_thread.join();
}
void thread_timer::add(std::chrono::milliseconds timeout,
std::function<void()> callback) {
- std::lock_guard<std::mutex> lock(m_mutex);
- m_events.emplace(std::chrono::steady_clock::now() + timeout,
std::move(callback));
- m_condition.notify_one();
+ std::lock_guard<std::mutex> lock(m_state->m_mutex);
+ m_state->m_events.emplace(std::chrono::steady_clock::now() + timeout,
std::move(callback));
+ m_state->m_condition.notify_one();
}
} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/common/detail/thread_timer.h
b/modules/platforms/cpp/ignite/common/detail/thread_timer.h
index aeb46332431..aff0d5cc6f5 100644
--- a/modules/platforms/cpp/ignite/common/detail/thread_timer.h
+++ b/modules/platforms/cpp/ignite/common/detail/thread_timer.h
@@ -60,6 +60,26 @@ class thread_timer final {
bool operator>(const timed_event& other) const { return timestamp >
other.timestamp; }
};
+ /**
+ * Shared mutable state owned jointly by the thread_timer object and the
timer thread's lambda.
+ * This ensures the state (mutex, condition variable, event queue) remains
valid even if
+ * thread_timer is destroyed from within a timer callback (which would
otherwise cause a
+ * self-join deadlock in stop()).
+ */
+ struct timer_state {
+ /** The stop flag. */
+ bool m_stopping{false};
+
+ /** Mutex. */
+ std::mutex m_mutex;
+
+ /** Conditional variable. */
+ std::condition_variable m_condition;
+
+ /** Timed event queue. */
+ std::priority_queue<timed_event, std::vector<timed_event>,
std::greater<>> m_events;
+ };
+
public:
/**
* Destructor.
@@ -93,20 +113,11 @@ private:
*/
thread_timer() = default;
- /** The stop flag. */
- bool m_stopping{false};
+ /** Shared mutable state (kept alive independently of the thread_timer
object lifetime). */
+ std::shared_ptr<timer_state> m_state{std::make_shared<timer_state>()};
/** Thread. */
std::thread m_thread;
-
- /** Mutex. */
- std::mutex m_mutex;
-
- /** Conditional variable. */
- std::condition_variable m_condition;
-
- /** Timed event. */
- std::priority_queue<timed_event, std::vector<timed_event>, std::greater<>>
m_events;
};
} // namespace ignite::detail