This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit ced60cb9b51937b3ab40efcbba4016782357e895 Author: Pxl <[email protected]> AuthorDate: Sat Aug 5 21:24:32 2023 +0800 [Bug](brpc) fix brpc failed on big query came concurrently (#22600) fix PriorityThreadPool get_info get wrong number change brpc pool from priority to fifo do not use brpc pool when send eos --- be/src/olap/olap_server.cpp | 2 +- be/src/olap/storage_engine.cpp | 2 +- be/src/olap/storage_engine.h | 1 - be/src/runtime/routine_load/data_consumer_group.h | 2 +- .../routine_load/routine_load_task_executor.h | 2 +- be/src/service/internal_service.cpp | 13 +++++++--- be/src/service/internal_service.h | 6 ++--- be/src/util/async_io.h | 2 +- be/src/util/blocking_priority_queue.hpp | 2 +- be/src/util/blocking_queue.hpp | 28 +++++++++++++++++----- be/src/util/threadpool.h | 2 +- ...iority_thread_pool.hpp => work_thread_pool.hpp} | 28 +++++++++++++--------- be/src/vec/exec/scan/scanner_scheduler.cpp | 2 +- be/src/vec/exec/scan/scanner_scheduler.h | 1 - be/test/olap/skiplist_test.cpp | 2 +- 15 files changed, 61 insertions(+), 34 deletions(-) diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index dc894419a9..0e47cf224e 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -73,12 +73,12 @@ #include "util/countdown_latch.h" #include "util/doris_metrics.h" #include "util/mem_info.h" -#include "util/priority_thread_pool.hpp" #include "util/thread.h" #include "util/threadpool.h" #include "util/thrift_rpc_helper.h" #include "util/time.h" #include "util/uid_util.h" +#include "util/work_thread_pool.hpp" using std::string; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 340f02b65b..ee7c9eb5d6 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -73,13 +73,13 @@ #include "runtime/stream_load/stream_load_recorder.h" #include "util/doris_metrics.h" #include "util/metrics.h" -#include "util/priority_thread_pool.hpp" #include "util/spinlock.h" #include "util/stopwatch.hpp" #include "util/thread.h" #include "util/threadpool.h" #include "util/trace.h" #include "util/uid_util.h" +#include "util/work_thread_pool.hpp" using std::filesystem::directory_iterator; using std::filesystem::path; diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 25f565f031..f02cd69816 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -61,7 +61,6 @@ class CumulativeCompaction; class SingleReplicaCompaction; class CumulativeCompactionPolicy; class MemTracker; -class PriorityThreadPool; class StreamLoadRecorder; class TCloneReq; class TCreateTabletReq; diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index 6ede3d9f66..e7be39f5a6 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -27,8 +27,8 @@ #include "common/status.h" #include "runtime/routine_load/data_consumer.h" #include "util/blocking_queue.hpp" -#include "util/priority_thread_pool.hpp" #include "util/uid_util.h" +#include "util/work_thread_pool.hpp" namespace RdKafka { class Message; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index b2ddfae2b7..90c1a06400 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -27,8 +27,8 @@ #include <vector> #include "runtime/routine_load/data_consumer_pool.h" -#include "util/priority_thread_pool.hpp" #include "util/uid_util.h" +#include "util/work_thread_pool.hpp" namespace doris { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 821e937fca..526507d00b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -172,7 +172,7 @@ template <typename T> concept CanCancel = requires(T* response) { response->mutable_status(); }; template <CanCancel T> -void offer_failed(T* response, google::protobuf::Closure* done, const PriorityThreadPool& pool) { +void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) { brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(TStatusCode::CANCELLED); response->mutable_status()->add_error_msgs("fail to offer request to the work pool, pool=" + @@ -180,7 +180,7 @@ void offer_failed(T* response, google::protobuf::Closure* done, const PriorityTh } template <typename T> -void offer_failed(T* response, google::protobuf::Closure* done, const PriorityThreadPool& pool) { +void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) { brpc::ClosureGuard closure_guard(done); LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool.get_info(); } @@ -983,7 +983,14 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr google::protobuf::Closure* done) { int64_t receive_time = GetCurrentTimeNanos(); response->set_receive_time(receive_time); - PriorityThreadPool& pool = request->has_block() ? _heavy_work_pool : _light_work_pool; + + if (!request->has_block() && config::brpc_light_work_pool_threads == -1) { + // under high concurrency, thread pool will have a lot of lock contention. + _transmit_block(controller, request, response, done, Status::OK()); + return; + } + + FifoThreadPool& pool = request->has_block() ? _heavy_work_pool : _light_work_pool; bool ret = pool.try_offer([this, controller, request, response, done]() { _transmit_block(controller, request, response, done, Status::OK()); }); diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 823f29504b..aa30959ca3 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -23,7 +23,7 @@ #include <string> #include "common/status.h" -#include "util/priority_thread_pool.hpp" +#include "util/work_thread_pool.hpp" namespace google { namespace protobuf { @@ -227,8 +227,8 @@ private: // the reason see issue #16634 // define the interface for reading and writing data as heavy interface // otherwise as light interface - PriorityThreadPool _heavy_work_pool; - PriorityThreadPool _light_work_pool; + FifoThreadPool _heavy_work_pool; + FifoThreadPool _light_work_pool; }; } // namespace doris diff --git a/be/src/util/async_io.h b/be/src/util/async_io.h index fce50e79b7..9330b88d36 100644 --- a/be/src/util/async_io.h +++ b/be/src/util/async_io.h @@ -21,7 +21,7 @@ #include "io/fs/file_system.h" #include "olap/olap_define.h" -#include "priority_thread_pool.hpp" +#include "work_thread_pool.hpp" namespace doris { diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index 41196c5cfb..92d6875135 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -185,7 +185,7 @@ public: return _queue.size(); } - uint32_t get_max_size() const { return _max_element; } + uint32_t get_capacity() const { return _max_element; } // Returns the total amount of time threads have blocked in blocking_get. uint64_t total_get_wait_time() const { return _total_get_wait_time; } diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp index 35cd74ee89..0b66220a37 100644 --- a/be/src/util/blocking_queue.hpp +++ b/be/src/util/blocking_queue.hpp @@ -82,6 +82,26 @@ public: return true; } + // Return false if queue full or has been shutdown. + bool try_put(const T& val) { + if (_shutdown || _list.size() >= _max_elements) { + return false; + } + + MonotonicStopWatch timer; + timer.start(); + std::unique_lock<std::mutex> unique_lock(_lock); + _total_put_wait_time += timer.elapsed_time(); + + if (_shutdown || _list.size() >= _max_elements) { + return false; + } + + _list.push_back(val); + _get_cv.notify_one(); + return true; + } + // Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut. void shutdown() { { @@ -98,6 +118,8 @@ public: return _list.size(); } + uint32_t get_capacity() const { return _max_elements; } + // Returns the total amount of time threads have blocked in BlockingGet. uint64_t total_get_wait_time() const { return _total_get_wait_time; } @@ -105,12 +127,6 @@ public: uint64_t total_put_wait_time() const { return _total_put_wait_time; } private: - uint32_t SizeLocked(const std::unique_lock<std::mutex>& lock) const { - // The size of 'get_list_' is read racily to avoid getting 'get_lock_' in write path. - DCHECK(lock.owns_lock()); - return _list.size(); - } - bool _shutdown; const int _max_elements; std::condition_variable _get_cv; // 'get' callers wait on this diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index d1b744b63e..3bb0a76c1c 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -38,13 +38,13 @@ #include <unordered_set> #include "common/status.h" +#include "util/work_thread_pool.hpp" namespace doris { class Thread; class ThreadPool; class ThreadPoolToken; -class PriorityThreadPool; class Runnable { public: diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/work_thread_pool.hpp similarity index 88% rename from be/src/util/priority_thread_pool.hpp rename to be/src/util/work_thread_pool.hpp index 8f648d9d37..109ae6ded8 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/work_thread_pool.hpp @@ -21,6 +21,7 @@ #include <thread> #include "util/blocking_priority_queue.hpp" +#include "util/blocking_queue.hpp" #include "util/lock.h" #include "util/thread.h" #include "util/thread_group.h" @@ -29,7 +30,8 @@ namespace doris { // Simple threadpool which processes items (of type T) in parallel which were placed on a // blocking queue by Offer(). Each item is processed by a single user-supplied method. -class PriorityThreadPool { +template <bool Priority = false> +class WorkThreadPool { public: // Signature of a work-processing function. Takes the integer id of the thread which is // calling it (ids run from 0 to num_threads - 1) and a reference to the item to @@ -49,22 +51,25 @@ public: } }; + using WorkQueue = + std::conditional_t<Priority, BlockingPriorityQueue<Task>, BlockingQueue<Task>>; + // Creates a new thread pool and start num_threads threads. // -- num_threads: how many threads are part of this pool // -- queue_size: the maximum size of the queue on which work items are offered. If the // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. - PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name) + WorkThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name) : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { for (int i = 0; i < num_threads; ++i) { _threads.create_thread( - std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i)); + std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i)); } } // Destructor ensures that all threads are terminated before this object is freed // (otherwise they may continue to run and reference member variables) - virtual ~PriorityThreadPool() { + virtual ~WorkThreadPool() { shutdown(); join(); } @@ -83,12 +88,12 @@ public: virtual bool offer(Task task) { return _work_queue.blocking_put(task); } virtual bool offer(WorkFunction func) { - PriorityThreadPool::Task task = {0, func, 0}; + WorkThreadPool::Task task = {0, func, 0}; return _work_queue.blocking_put(task); } virtual bool try_offer(WorkFunction func) { - PriorityThreadPool::Task task = {0, func, 0}; + WorkThreadPool::Task task = {0, func, 0}; return _work_queue.try_put(task); } @@ -126,8 +131,8 @@ public: return fmt::format( "PriorityThreadPool(name={}, queue_size={}/{}, active_thread={}/{}, " "total_get_wait_time={}, total_put_wait_time={})", - _name, get_queue_size(), _work_queue.get_size(), _work_queue.get_max_size(), - _active_threads, _threads.size(), _work_queue.total_get_wait_time(), + _name, get_queue_size(), _work_queue.get_capacity(), _active_threads, + _threads.size(), _work_queue.total_get_wait_time(), _work_queue.total_put_wait_time()); } @@ -161,9 +166,7 @@ private: } } - // Queue on which work items are held until a thread is available to process them in - // FIFO order. - BlockingPriorityQueue<Task> _work_queue; + WorkQueue _work_queue; // Set to true when threads should stop doing work and terminate. std::atomic<bool> _shutdown; @@ -171,4 +174,7 @@ private: std::atomic<int> _active_threads; }; +using PriorityThreadPool = WorkThreadPool<true>; +using FifoThreadPool = WorkThreadPool<false>; + } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 2cc228bedb..632e28e00a 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -40,10 +40,10 @@ #include "util/blocking_queue.hpp" #include "util/cpu_info.h" #include "util/defer_op.h" -#include "util/priority_thread_pool.hpp" #include "util/priority_work_stealing_thread_pool.hpp" #include "util/thread.h" #include "util/threadpool.h" +#include "util/work_thread_pool.hpp" #include "vec/core/block.h" #include "vec/exec/scan/new_olap_scanner.h" // IWYU pragma: keep #include "vec/exec/scan/scanner_context.h" diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 630e3a3803..8c66814bf5 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -26,7 +26,6 @@ namespace doris { class ExecEnv; -class PriorityThreadPool; namespace vectorized { class VScanner; diff --git a/be/test/olap/skiplist_test.cpp b/be/test/olap/skiplist_test.cpp index 667ad6ba4c..55c1b28bc5 100644 --- a/be/test/olap/skiplist_test.cpp +++ b/be/test/olap/skiplist_test.cpp @@ -33,8 +33,8 @@ #include "gtest/gtest_pred_impl.h" #include "testutil/test_util.h" #include "util/hash_util.hpp" -#include "util/priority_thread_pool.hpp" #include "util/random.h" +#include "util/work_thread_pool.hpp" #include "vec/common/arena.h" namespace doris { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
