This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7839a0e708 [Bug](brpc) fix brpc failed on big query came concurrently
(#22600)
7839a0e708 is described below
commit 7839a0e708c6b642ad49427b2a2a0866a81fec0a
Author: Pxl <[email protected]>
AuthorDate: Sat Aug 5 21:24:32 2023 +0800
[Bug](brpc) fix brpc failed on big query came concurrently (#22600)
fix PriorityThreadPool get_info get wrong number
change brpc pool from priority to fifo
do not use brpc pool when send eos
---
be/src/olap/olap_server.cpp | 2 +-
be/src/olap/storage_engine.cpp | 2 +-
be/src/olap/storage_engine.h | 1 -
be/src/runtime/routine_load/data_consumer_group.h | 2 +-
.../routine_load/routine_load_task_executor.h | 2 +-
be/src/service/internal_service.cpp | 13 +++++++---
be/src/service/internal_service.h | 6 ++---
be/src/util/async_io.h | 2 +-
be/src/util/blocking_priority_queue.hpp | 2 +-
be/src/util/blocking_queue.hpp | 28 +++++++++++++++++-----
be/src/util/threadpool.h | 2 +-
...iority_thread_pool.hpp => work_thread_pool.hpp} | 28 +++++++++++++---------
be/src/vec/exec/scan/scanner_scheduler.cpp | 2 +-
be/src/vec/exec/scan/scanner_scheduler.h | 1 -
be/test/olap/skiplist_test.cpp | 2 +-
15 files changed, 61 insertions(+), 34 deletions(-)
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index e293499424..1733776099 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -72,12 +72,12 @@
#include "util/countdown_latch.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
-#include "util/priority_thread_pool.hpp"
#include "util/thread.h"
#include "util/threadpool.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
#include "util/uid_util.h"
+#include "util/work_thread_pool.hpp"
using std::string;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index fb07235702..727083e8f2 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -73,13 +73,13 @@
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
-#include "util/priority_thread_pool.hpp"
#include "util/spinlock.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
#include "util/threadpool.h"
#include "util/trace.h"
#include "util/uid_util.h"
+#include "util/work_thread_pool.hpp"
using std::filesystem::directory_iterator;
using std::filesystem::path;
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 3c3ecf33e5..99237a6687 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -61,7 +61,6 @@ class CumulativeCompaction;
class SingleReplicaCompaction;
class CumulativeCompactionPolicy;
class MemTracker;
-class PriorityThreadPool;
class StreamLoadRecorder;
class TCloneReq;
class TCreateTabletReq;
diff --git a/be/src/runtime/routine_load/data_consumer_group.h
b/be/src/runtime/routine_load/data_consumer_group.h
index 6ede3d9f66..e7be39f5a6 100644
--- a/be/src/runtime/routine_load/data_consumer_group.h
+++ b/be/src/runtime/routine_load/data_consumer_group.h
@@ -27,8 +27,8 @@
#include "common/status.h"
#include "runtime/routine_load/data_consumer.h"
#include "util/blocking_queue.hpp"
-#include "util/priority_thread_pool.hpp"
#include "util/uid_util.h"
+#include "util/work_thread_pool.hpp"
namespace RdKafka {
class Message;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h
b/be/src/runtime/routine_load/routine_load_task_executor.h
index b2ddfae2b7..90c1a06400 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -27,8 +27,8 @@
#include <vector>
#include "runtime/routine_load/data_consumer_pool.h"
-#include "util/priority_thread_pool.hpp"
#include "util/uid_util.h"
+#include "util/work_thread_pool.hpp"
namespace doris {
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index cf46ac1b29..3a5aa06125 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -172,7 +172,7 @@ template <typename T>
concept CanCancel = requires(T* response) { response->mutable_status(); };
template <CanCancel T>
-void offer_failed(T* response, google::protobuf::Closure* done, const
PriorityThreadPool& pool) {
+void offer_failed(T* response, google::protobuf::Closure* done, const
FifoThreadPool& pool) {
brpc::ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
response->mutable_status()->add_error_msgs("fail to offer request to the
work pool, pool=" +
@@ -180,7 +180,7 @@ void offer_failed(T* response, google::protobuf::Closure*
done, const PriorityTh
}
template <typename T>
-void offer_failed(T* response, google::protobuf::Closure* done, const
PriorityThreadPool& pool) {
+void offer_failed(T* response, google::protobuf::Closure* done, const
FifoThreadPool& pool) {
brpc::ClosureGuard closure_guard(done);
LOG(WARNING) << "fail to offer request to the work pool, pool=" <<
pool.get_info();
}
@@ -983,7 +983,14 @@ void
PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr
google::protobuf::Closure* done) {
int64_t receive_time = GetCurrentTimeNanos();
response->set_receive_time(receive_time);
- PriorityThreadPool& pool = request->has_block() ? _heavy_work_pool :
_light_work_pool;
+
+ if (!request->has_block() && config::brpc_light_work_pool_threads == -1) {
+ // under high concurrency, thread pool will have a lot of lock
contention.
+ _transmit_block(controller, request, response, done, Status::OK());
+ return;
+ }
+
+ FifoThreadPool& pool = request->has_block() ? _heavy_work_pool :
_light_work_pool;
bool ret = pool.try_offer([this, controller, request, response, done]() {
_transmit_block(controller, request, response, done, Status::OK());
});
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 823f29504b..aa30959ca3 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -23,7 +23,7 @@
#include <string>
#include "common/status.h"
-#include "util/priority_thread_pool.hpp"
+#include "util/work_thread_pool.hpp"
namespace google {
namespace protobuf {
@@ -227,8 +227,8 @@ private:
// the reason see issue #16634
// define the interface for reading and writing data as heavy interface
// otherwise as light interface
- PriorityThreadPool _heavy_work_pool;
- PriorityThreadPool _light_work_pool;
+ FifoThreadPool _heavy_work_pool;
+ FifoThreadPool _light_work_pool;
};
} // namespace doris
diff --git a/be/src/util/async_io.h b/be/src/util/async_io.h
index fce50e79b7..9330b88d36 100644
--- a/be/src/util/async_io.h
+++ b/be/src/util/async_io.h
@@ -21,7 +21,7 @@
#include "io/fs/file_system.h"
#include "olap/olap_define.h"
-#include "priority_thread_pool.hpp"
+#include "work_thread_pool.hpp"
namespace doris {
diff --git a/be/src/util/blocking_priority_queue.hpp
b/be/src/util/blocking_priority_queue.hpp
index 41196c5cfb..92d6875135 100644
--- a/be/src/util/blocking_priority_queue.hpp
+++ b/be/src/util/blocking_priority_queue.hpp
@@ -185,7 +185,7 @@ public:
return _queue.size();
}
- uint32_t get_max_size() const { return _max_element; }
+ uint32_t get_capacity() const { return _max_element; }
// Returns the total amount of time threads have blocked in blocking_get.
uint64_t total_get_wait_time() const { return _total_get_wait_time; }
diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp
index 35cd74ee89..0b66220a37 100644
--- a/be/src/util/blocking_queue.hpp
+++ b/be/src/util/blocking_queue.hpp
@@ -82,6 +82,26 @@ public:
return true;
}
+ // Return false if queue full or has been shutdown.
+ bool try_put(const T& val) {
+ if (_shutdown || _list.size() >= _max_elements) {
+ return false;
+ }
+
+ MonotonicStopWatch timer;
+ timer.start();
+ std::unique_lock<std::mutex> unique_lock(_lock);
+ _total_put_wait_time += timer.elapsed_time();
+
+ if (_shutdown || _list.size() >= _max_elements) {
+ return false;
+ }
+
+ _list.push_back(val);
+ _get_cv.notify_one();
+ return true;
+ }
+
// Shut down the queue. Wakes up all threads waiting on BlockingGet or
BlockingPut.
void shutdown() {
{
@@ -98,6 +118,8 @@ public:
return _list.size();
}
+ uint32_t get_capacity() const { return _max_elements; }
+
// Returns the total amount of time threads have blocked in BlockingGet.
uint64_t total_get_wait_time() const { return _total_get_wait_time; }
@@ -105,12 +127,6 @@ public:
uint64_t total_put_wait_time() const { return _total_put_wait_time; }
private:
- uint32_t SizeLocked(const std::unique_lock<std::mutex>& lock) const {
- // The size of 'get_list_' is read racily to avoid getting 'get_lock_'
in write path.
- DCHECK(lock.owns_lock());
- return _list.size();
- }
-
bool _shutdown;
const int _max_elements;
std::condition_variable _get_cv; // 'get' callers wait on this
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index d1b744b63e..3bb0a76c1c 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -38,13 +38,13 @@
#include <unordered_set>
#include "common/status.h"
+#include "util/work_thread_pool.hpp"
namespace doris {
class Thread;
class ThreadPool;
class ThreadPoolToken;
-class PriorityThreadPool;
class Runnable {
public:
diff --git a/be/src/util/priority_thread_pool.hpp
b/be/src/util/work_thread_pool.hpp
similarity index 88%
rename from be/src/util/priority_thread_pool.hpp
rename to be/src/util/work_thread_pool.hpp
index 8f648d9d37..109ae6ded8 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/work_thread_pool.hpp
@@ -21,6 +21,7 @@
#include <thread>
#include "util/blocking_priority_queue.hpp"
+#include "util/blocking_queue.hpp"
#include "util/lock.h"
#include "util/thread.h"
#include "util/thread_group.h"
@@ -29,7 +30,8 @@ namespace doris {
// Simple threadpool which processes items (of type T) in parallel which were
placed on a
// blocking queue by Offer(). Each item is processed by a single user-supplied
method.
-class PriorityThreadPool {
+template <bool Priority = false>
+class WorkThreadPool {
public:
// Signature of a work-processing function. Takes the integer id of the
thread which is
// calling it (ids run from 0 to num_threads - 1) and a reference to the
item to
@@ -49,22 +51,25 @@ public:
}
};
+ using WorkQueue =
+ std::conditional_t<Priority, BlockingPriorityQueue<Task>,
BlockingQueue<Task>>;
+
// Creates a new thread pool and start num_threads threads.
// -- num_threads: how many threads are part of this pool
// -- queue_size: the maximum size of the queue on which work items are
offered. If the
// queue exceeds this size, subsequent calls to Offer will block until
there is
// capacity available.
- PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const
std::string& name)
+ WorkThreadPool(uint32_t num_threads, uint32_t queue_size, const
std::string& name)
: _work_queue(queue_size), _shutdown(false), _name(name),
_active_threads(0) {
for (int i = 0; i < num_threads; ++i) {
_threads.create_thread(
-
std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i));
+ std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread),
this, i));
}
}
// Destructor ensures that all threads are terminated before this object
is freed
// (otherwise they may continue to run and reference member variables)
- virtual ~PriorityThreadPool() {
+ virtual ~WorkThreadPool() {
shutdown();
join();
}
@@ -83,12 +88,12 @@ public:
virtual bool offer(Task task) { return _work_queue.blocking_put(task); }
virtual bool offer(WorkFunction func) {
- PriorityThreadPool::Task task = {0, func, 0};
+ WorkThreadPool::Task task = {0, func, 0};
return _work_queue.blocking_put(task);
}
virtual bool try_offer(WorkFunction func) {
- PriorityThreadPool::Task task = {0, func, 0};
+ WorkThreadPool::Task task = {0, func, 0};
return _work_queue.try_put(task);
}
@@ -126,8 +131,8 @@ public:
return fmt::format(
"PriorityThreadPool(name={}, queue_size={}/{},
active_thread={}/{}, "
"total_get_wait_time={}, total_put_wait_time={})",
- _name, get_queue_size(), _work_queue.get_size(),
_work_queue.get_max_size(),
- _active_threads, _threads.size(),
_work_queue.total_get_wait_time(),
+ _name, get_queue_size(), _work_queue.get_capacity(),
_active_threads,
+ _threads.size(), _work_queue.total_get_wait_time(),
_work_queue.total_put_wait_time());
}
@@ -161,9 +166,7 @@ private:
}
}
- // Queue on which work items are held until a thread is available to
process them in
- // FIFO order.
- BlockingPriorityQueue<Task> _work_queue;
+ WorkQueue _work_queue;
// Set to true when threads should stop doing work and terminate.
std::atomic<bool> _shutdown;
@@ -171,4 +174,7 @@ private:
std::atomic<int> _active_threads;
};
+using PriorityThreadPool = WorkThreadPool<true>;
+using FifoThreadPool = WorkThreadPool<false>;
+
} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index e72a01c952..bac67e72a9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -41,11 +41,11 @@
#include "util/blocking_queue.hpp"
#include "util/cpu_info.h"
#include "util/defer_op.h"
-#include "util/priority_thread_pool.hpp"
#include "util/priority_work_stealing_thread_pool.hpp"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
+#include "util/work_thread_pool.hpp"
#include "vec/core/block.h"
#include "vec/exec/scan/new_olap_scanner.h" // IWYU pragma: keep
#include "vec/exec/scan/scanner_context.h"
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index ccc809becd..e669fd9b77 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -27,7 +27,6 @@
namespace doris {
class ExecEnv;
-class PriorityThreadPool;
namespace vectorized {
class VScanner;
diff --git a/be/test/olap/skiplist_test.cpp b/be/test/olap/skiplist_test.cpp
index 667ad6ba4c..55c1b28bc5 100644
--- a/be/test/olap/skiplist_test.cpp
+++ b/be/test/olap/skiplist_test.cpp
@@ -33,8 +33,8 @@
#include "gtest/gtest_pred_impl.h"
#include "testutil/test_util.h"
#include "util/hash_util.hpp"
-#include "util/priority_thread_pool.hpp"
#include "util/random.h"
+#include "util/work_thread_pool.hpp"
#include "vec/common/arena.h"
namespace doris {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]