This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 95591ce49a [refactor](cv)wait on condition variable more gently
(#12620)
95591ce49a is described below
commit 95591ce49a3215d3223748ed219b2441481b4678
Author: starocean999 <[email protected]>
AuthorDate: Tue Nov 8 08:40:31 2022 +0800
[refactor](cv)wait on condition variable more gently (#12620)
---
be/src/agent/task_worker_pool.cpp | 85 ++++++++-----------
be/src/util/blocking_priority_queue.hpp | 135 ++++++++++++++-----------------
be/src/util/blocking_queue.hpp | 58 +++++--------
be/src/util/threadpool.cpp | 17 ++--
be/src/vec/exec/scan/scanner_context.cpp | 15 ++--
5 files changed, 127 insertions(+), 183 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 5e482ef90f..3f5ca98021 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -341,9 +341,8 @@ void
TaskWorkerPool::_create_tablet_worker_thread_callback() {
TCreateTabletReq create_tablet_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -416,9 +415,8 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
TDropTabletReq drop_tablet_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -469,9 +467,8 @@ void TaskWorkerPool::_alter_tablet_worker_thread_callback()
{
TAgentTaskRequest agent_task_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -592,9 +589,8 @@ void TaskWorkerPool::_push_worker_thread_callback() {
int32_t index = 0;
do {
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -664,9 +660,8 @@ void
TaskWorkerPool::_publish_version_worker_thread_callback() {
TPublishVersionRequest publish_version_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -773,9 +768,8 @@ void
TaskWorkerPool::_clear_transaction_task_worker_thread_callback() {
TClearTransactionTaskRequest clear_transaction_task_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -826,9 +820,8 @@ void
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
TUpdateTabletMetaInfoReq update_tablet_meta_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -902,9 +895,8 @@ void TaskWorkerPool::_clone_worker_thread_callback() {
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -952,9 +944,8 @@ void
TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
TStorageMediumMigrateReq storage_medium_migrate_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -1054,9 +1045,8 @@ void
TaskWorkerPool::_check_consistency_worker_thread_callback() {
TCheckConsistencyReq check_consistency_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -1248,9 +1238,8 @@ void TaskWorkerPool::_upload_worker_thread_callback() {
TUploadReq upload_request;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -1300,9 +1289,8 @@ void TaskWorkerPool::_download_worker_thread_callback() {
TDownloadReq download_request;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -1353,9 +1341,8 @@ void TaskWorkerPool::_make_snapshot_thread_callback() {
TSnapshotRequest snapshot_request;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -1413,9 +1400,8 @@ void TaskWorkerPool::_release_snapshot_thread_callback() {
TReleaseSnapshotRequest release_snapshot_request;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -1463,9 +1449,8 @@ void TaskWorkerPool::_move_dir_thread_callback() {
TMoveDirReq move_dir_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -1571,9 +1556,8 @@ void
TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
@@ -1673,9 +1657,8 @@ void
TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
TGetStoragePolicy get_storage_policy_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
if (!_is_work) {
return;
}
diff --git a/be/src/util/blocking_priority_queue.hpp
b/be/src/util/blocking_priority_queue.hpp
index 8c264e57dd..29060613c8 100644
--- a/be/src/util/blocking_priority_queue.hpp
+++ b/be/src/util/blocking_priority_queue.hpp
@@ -49,101 +49,90 @@ public:
// -- timeout_ms: 0 means wait indefinitely
bool blocking_get(T* out, uint32_t timeout_ms = 0) {
MonotonicStopWatch timer;
+ timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
-
- while (true) {
- if (!_queue.empty()) {
- // 定期提高队列中残留的任务优先级
- // 保证优先级较低的大查询不至于完全饿死
- if (_upgrade_counter >
config::priority_queue_remaining_tasks_increased_frequency) {
- std::priority_queue<T> tmp_queue;
- while (!_queue.empty()) {
- T v = _queue.top();
- _queue.pop();
- ++v;
- tmp_queue.push(v);
- }
- swap(_queue, tmp_queue);
- _upgrade_counter = 0;
+ bool wait_successful = false;
+ if (timeout_ms > 0) {
+ wait_successful = _get_cv.wait_for(unique_lock,
std::chrono::milliseconds(timeout_ms),
+ [this] { return _shutdown ||
!_queue.empty(); });
+ } else {
+ _get_cv.wait(unique_lock, [this] { return _shutdown ||
!_queue.empty(); });
+ wait_successful = true;
+ }
+ _total_get_wait_time += timer.elapsed_time();
+ if (wait_successful) {
+ if (_upgrade_counter >
config::priority_queue_remaining_tasks_increased_frequency) {
+ std::priority_queue<T> tmp_queue;
+ while (!_queue.empty()) {
+ T v = _queue.top();
+ _queue.pop();
+ ++v;
+ tmp_queue.push(v);
}
+ swap(_queue, tmp_queue);
+ _upgrade_counter = 0;
+ }
+ if (!_queue.empty()) {
*out = _queue.top();
_queue.pop();
++_upgrade_counter;
- _total_get_wait_time += timer.elapsed_time();
- unique_lock.unlock();
_put_cv.notify_one();
return true;
- }
- if (_shutdown) {
- return false;
- }
-
- timer.start();
- if (timeout_ms != 0) {
- if (_get_cv.wait_for(unique_lock,
std::chrono::milliseconds(timeout_ms)) ==
- std::cv_status::timeout) {
- return false;
- }
} else {
- _get_cv.wait(unique_lock);
+ assert(_shutdown);
+ return false;
}
- timer.stop();
+ } else {
+ //time out
+ assert(!_shutdown);
+ return false;
}
}
bool non_blocking_get(T* out) {
MonotonicStopWatch timer;
+ timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
- while (true) {
- if (!_queue.empty()) {
- // 定期提高队列中残留的任务优先级
- // 保证优先级较低的大查询不至于完全饿死
- if (_upgrade_counter >
config::priority_queue_remaining_tasks_increased_frequency) {
- std::priority_queue<T> tmp_queue;
- while (!_queue.empty()) {
- T v = _queue.top();
- _queue.pop();
- ++v;
- tmp_queue.push(v);
- }
- swap(_queue, tmp_queue);
- _upgrade_counter = 0;
+ if (!_queue.empty()) {
+ // 定期提高队列中残留的任务优先级
+ // 保证优先级较低的大查询不至于完全饿死
+ if (_upgrade_counter >
config::priority_queue_remaining_tasks_increased_frequency) {
+ std::priority_queue<T> tmp_queue;
+ while (!_queue.empty()) {
+ T v = _queue.top();
+ _queue.pop();
+ ++v;
+ tmp_queue.push(v);
}
- *out = _queue.top();
- _queue.pop();
- ++_upgrade_counter;
- _total_get_wait_time += timer.elapsed_time();
- unique_lock.unlock();
- _put_cv.notify_one();
- return true;
+ swap(_queue, tmp_queue);
+ _upgrade_counter = 0;
}
- if (_shutdown) {
- return false;
- }
- return false;
+ *out = _queue.top();
+ _queue.pop();
+ ++_upgrade_counter;
+ _total_get_wait_time += timer.elapsed_time();
+ _put_cv.notify_one();
+ return true;
}
+
+ return false;
}
// Puts an element into the queue, waiting indefinitely until there is
space.
// If the queue is shut down, returns false.
bool blocking_put(const T& val) {
MonotonicStopWatch timer;
+ timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
-
- while (_queue.size() >= _max_element && !_shutdown) {
- timer.start();
- _put_cv.wait(unique_lock);
- timer.stop();
- }
+ _put_cv.wait(unique_lock, [this] { return _shutdown || _queue.size() <
_max_element; });
_total_put_wait_time += timer.elapsed_time();
+
if (_shutdown) {
return false;
}
- DCHECK_LT(_queue.size(), _max_element);
_queue.push(val);
- unique_lock.unlock();
_get_cv.notify_one();
return true;
}
@@ -151,7 +140,7 @@ public:
// Shut down the queue. Wakes up all threads waiting on blocking_get or
blocking_put.
void shutdown() {
{
- std::unique_lock<std::mutex> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
_shutdown = true;
}
_get_cv.notify_all();
@@ -159,24 +148,18 @@ public:
}
uint32_t get_size() const {
- std::unique_lock<std::mutex> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
return _queue.size();
}
// Returns the total amount of time threads have blocked in blocking_get.
- uint64_t total_get_wait_time() const {
- std::lock_guard<std::mutex> guard(_lock);
- return _total_get_wait_time;
- }
+ uint64_t total_get_wait_time() const { return _total_get_wait_time; }
// Returns the total amount of time threads have blocked in blocking_put.
- uint64_t total_put_wait_time() const {
- std::lock_guard<std::mutex> guard(_lock);
- return _total_put_wait_time;
- }
+ uint64_t total_put_wait_time() const { return _total_put_wait_time; }
private:
- std::atomic<bool> _shutdown;
+ bool _shutdown;
const int _max_element;
std::condition_variable _get_cv; // 'get' callers wait on this
std::condition_variable _put_cv; // 'put' callers wait on this
@@ -184,8 +167,8 @@ private:
mutable std::mutex _lock;
std::priority_queue<T> _queue;
int _upgrade_counter;
- uint64_t _total_get_wait_time;
- uint64_t _total_put_wait_time;
+ std::atomic<uint64_t> _total_get_wait_time;
+ std::atomic<uint64_t> _total_put_wait_time;
};
} // namespace doris
diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp
index 7ba33c37f1..dff6911edf 100644
--- a/be/src/util/blocking_queue.hpp
+++ b/be/src/util/blocking_queue.hpp
@@ -22,6 +22,7 @@
#include <unistd.h>
+#include <atomic>
#include <condition_variable>
#include <list>
#include <mutex>
@@ -47,25 +48,19 @@ public:
// are no more elements available.
bool blocking_get(T* out) {
MonotonicStopWatch timer;
+ timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
-
- while (true) {
- if (!_list.empty()) {
- *out = _list.front();
- _list.pop_front();
- _total_get_wait_time += timer.elapsed_time();
- unique_lock.unlock();
- _put_cv.notify_one();
- return true;
- }
-
- if (_shutdown) {
- return false;
- }
-
- timer.start();
- _get_cv.wait(unique_lock);
- timer.stop();
+ _get_cv.wait(unique_lock, [this] { return _shutdown || !_list.empty();
});
+ _total_get_wait_time += timer.elapsed_time();
+
+ if (!_list.empty()) {
+ *out = _list.front();
+ _list.pop_front();
+ _put_cv.notify_one();
+ return true;
+ } else {
+ assert(_shutdown);
+ return false;
}
}
@@ -104,23 +99,16 @@ public:
// If the queue is shut down, returns false.
bool blocking_put(const T& val) {
MonotonicStopWatch timer;
+ timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
-
- while (_list.size() >= _max_elements && !_shutdown) {
- timer.start();
- _put_cv.wait(unique_lock);
- timer.stop();
- }
-
+ _put_cv.wait(unique_lock, [this] { return _shutdown || _list.size() <
_max_elements; });
_total_put_wait_time += timer.elapsed_time();
if (_shutdown) {
return false;
}
- DCHECK_LT(_list.size(), _max_elements);
_list.push_back(val);
- unique_lock.unlock();
_get_cv.notify_one();
return true;
}
@@ -137,21 +125,15 @@ public:
}
uint32_t get_size() const {
- std::unique_lock<std::mutex> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
return _list.size();
}
// Returns the total amount of time threads have blocked in BlockingGet.
- uint64_t total_get_wait_time() const {
- std::lock_guard<std::mutex> guard(_lock);
- return _total_get_wait_time;
- }
+ uint64_t total_get_wait_time() const { return _total_get_wait_time; }
// Returns the total amount of time threads have blocked in BlockingPut.
- uint64_t total_put_wait_time() const {
- std::lock_guard<std::mutex> guard(_lock);
- return _total_put_wait_time;
- }
+ uint64_t total_put_wait_time() const { return _total_put_wait_time; }
private:
uint32_t SizeLocked(const std::unique_lock<std::mutex>& lock) const {
@@ -167,8 +149,8 @@ private:
// _lock guards access to _list, total_get_wait_time, and
total_put_wait_time
mutable std::mutex _lock;
std::list<T> _list;
- uint64_t _total_get_wait_time;
- uint64_t _total_put_wait_time;
+ std::atomic<uint64_t> _total_get_wait_time;
+ std::atomic<uint64_t> _total_put_wait_time;
};
} // namespace doris
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index 881984ba38..76bc7e2171 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -148,9 +148,7 @@ void ThreadPoolToken::shutdown() {
case State::QUIESCING:
// The token is already quiescing. Just wait for a worker thread to
// switch it to QUIESCED.
- while (state() != State::QUIESCED) {
- _not_running_cond.wait(l);
- }
+ _not_running_cond.wait(l, [this]() { return state() ==
State::QUIESCED; });
break;
default:
break;
@@ -160,9 +158,7 @@ void ThreadPoolToken::shutdown() {
void ThreadPoolToken::wait() {
std::unique_lock<std::mutex> l(_pool->_lock);
_pool->check_not_pool_thread_unlocked();
- while (is_active()) {
- _not_running_cond.wait(l);
- }
+ _not_running_cond.wait(l, [this]() { return !is_active(); });
}
void ThreadPoolToken::transition(State new_state) {
@@ -320,9 +316,8 @@ void ThreadPool::shutdown() {
_idle_threads.front().not_empty.notify_one();
_idle_threads.pop_front();
}
- while (_num_threads + _num_threads_pending_start > 0) {
- _no_threads_cond.wait(l);
- }
+
+ _no_threads_cond.wait(l, [this]() { return _num_threads +
_num_threads_pending_start == 0; });
// All the threads have exited. Check the state of each token.
for (auto* t : _tokens) {
@@ -465,9 +460,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r,
ThreadPoolToken* token
void ThreadPool::wait() {
std::unique_lock<std::mutex> l(_lock);
check_not_pool_thread_unlocked();
- while (_total_queued_tasks > 0 || _active_threads > 0) {
- _idle_cond.wait(l);
- }
+ _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 &&
_active_threads == 0; });
}
void ThreadPool::dispatch_thread() {
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 45e106039f..7f6cd847bb 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -116,13 +116,16 @@ Status
ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos
_state->exec_env()->scanner_scheduler()->submit(this);
}
// Wait for block from queue
- while (_process_status.ok() && !_is_finished && blocks_queue.empty()) {
- if (_state->is_cancelled()) {
- _process_status = Status::Cancelled("cancelled");
- break;
- }
+ {
SCOPED_TIMER(_parent->_scanner_wait_batch_timer);
- _blocks_queue_added_cv.wait_for(l, std::chrono::seconds(1));
+ _blocks_queue_added_cv.wait(l, [this]() {
+ return !blocks_queue.empty() || _is_finished ||
!_process_status.ok() ||
+ _state->is_cancelled();
+ });
+ }
+
+ if (_state->is_cancelled()) {
+ _process_status = Status::Cancelled("cancelled");
}
if (!_process_status.ok()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]