This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7839a0e708 [Bug](brpc) fix brpc failed on big query came concurrently 
(#22600)
7839a0e708 is described below

commit 7839a0e708c6b642ad49427b2a2a0866a81fec0a
Author: Pxl <[email protected]>
AuthorDate: Sat Aug 5 21:24:32 2023 +0800

    [Bug](brpc) fix brpc failed on big query came concurrently (#22600)
    
    fix PriorityThreadPool get_info get wrong number
    change brpc pool from priority to fifo
    do not use brpc pool when send eos
---
 be/src/olap/olap_server.cpp                        |  2 +-
 be/src/olap/storage_engine.cpp                     |  2 +-
 be/src/olap/storage_engine.h                       |  1 -
 be/src/runtime/routine_load/data_consumer_group.h  |  2 +-
 .../routine_load/routine_load_task_executor.h      |  2 +-
 be/src/service/internal_service.cpp                | 13 +++++++---
 be/src/service/internal_service.h                  |  6 ++---
 be/src/util/async_io.h                             |  2 +-
 be/src/util/blocking_priority_queue.hpp            |  2 +-
 be/src/util/blocking_queue.hpp                     | 28 +++++++++++++++++-----
 be/src/util/threadpool.h                           |  2 +-
 ...iority_thread_pool.hpp => work_thread_pool.hpp} | 28 +++++++++++++---------
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  2 +-
 be/src/vec/exec/scan/scanner_scheduler.h           |  1 -
 be/test/olap/skiplist_test.cpp                     |  2 +-
 15 files changed, 61 insertions(+), 34 deletions(-)

diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index e293499424..1733776099 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -72,12 +72,12 @@
 #include "util/countdown_latch.h"
 #include "util/doris_metrics.h"
 #include "util/mem_info.h"
-#include "util/priority_thread_pool.hpp"
 #include "util/thread.h"
 #include "util/threadpool.h"
 #include "util/thrift_rpc_helper.h"
 #include "util/time.h"
 #include "util/uid_util.h"
+#include "util/work_thread_pool.hpp"
 
 using std::string;
 
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index fb07235702..727083e8f2 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -73,13 +73,13 @@
 #include "runtime/stream_load/stream_load_recorder.h"
 #include "util/doris_metrics.h"
 #include "util/metrics.h"
-#include "util/priority_thread_pool.hpp"
 #include "util/spinlock.h"
 #include "util/stopwatch.hpp"
 #include "util/thread.h"
 #include "util/threadpool.h"
 #include "util/trace.h"
 #include "util/uid_util.h"
+#include "util/work_thread_pool.hpp"
 
 using std::filesystem::directory_iterator;
 using std::filesystem::path;
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 3c3ecf33e5..99237a6687 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -61,7 +61,6 @@ class CumulativeCompaction;
 class SingleReplicaCompaction;
 class CumulativeCompactionPolicy;
 class MemTracker;
-class PriorityThreadPool;
 class StreamLoadRecorder;
 class TCloneReq;
 class TCreateTabletReq;
diff --git a/be/src/runtime/routine_load/data_consumer_group.h 
b/be/src/runtime/routine_load/data_consumer_group.h
index 6ede3d9f66..e7be39f5a6 100644
--- a/be/src/runtime/routine_load/data_consumer_group.h
+++ b/be/src/runtime/routine_load/data_consumer_group.h
@@ -27,8 +27,8 @@
 #include "common/status.h"
 #include "runtime/routine_load/data_consumer.h"
 #include "util/blocking_queue.hpp"
-#include "util/priority_thread_pool.hpp"
 #include "util/uid_util.h"
+#include "util/work_thread_pool.hpp"
 
 namespace RdKafka {
 class Message;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index b2ddfae2b7..90c1a06400 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -27,8 +27,8 @@
 #include <vector>
 
 #include "runtime/routine_load/data_consumer_pool.h"
-#include "util/priority_thread_pool.hpp"
 #include "util/uid_util.h"
+#include "util/work_thread_pool.hpp"
 
 namespace doris {
 
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index cf46ac1b29..3a5aa06125 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -172,7 +172,7 @@ template <typename T>
 concept CanCancel = requires(T* response) { response->mutable_status(); };
 
 template <CanCancel T>
-void offer_failed(T* response, google::protobuf::Closure* done, const 
PriorityThreadPool& pool) {
+void offer_failed(T* response, google::protobuf::Closure* done, const 
FifoThreadPool& pool) {
     brpc::ClosureGuard closure_guard(done);
     response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
     response->mutable_status()->add_error_msgs("fail to offer request to the 
work pool, pool=" +
@@ -180,7 +180,7 @@ void offer_failed(T* response, google::protobuf::Closure* 
done, const PriorityTh
 }
 
 template <typename T>
-void offer_failed(T* response, google::protobuf::Closure* done, const 
PriorityThreadPool& pool) {
+void offer_failed(T* response, google::protobuf::Closure* done, const 
FifoThreadPool& pool) {
     brpc::ClosureGuard closure_guard(done);
     LOG(WARNING) << "fail to offer request to the work pool, pool=" << 
pool.get_info();
 }
@@ -983,7 +983,14 @@ void 
PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr
                                           google::protobuf::Closure* done) {
     int64_t receive_time = GetCurrentTimeNanos();
     response->set_receive_time(receive_time);
-    PriorityThreadPool& pool = request->has_block() ? _heavy_work_pool : 
_light_work_pool;
+
+    if (!request->has_block() && config::brpc_light_work_pool_threads == -1) {
+        // under high concurrency, thread pool will have a lot of lock 
contention.
+        _transmit_block(controller, request, response, done, Status::OK());
+        return;
+    }
+
+    FifoThreadPool& pool = request->has_block() ? _heavy_work_pool : 
_light_work_pool;
     bool ret = pool.try_offer([this, controller, request, response, done]() {
         _transmit_block(controller, request, response, done, Status::OK());
     });
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 823f29504b..aa30959ca3 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -23,7 +23,7 @@
 #include <string>
 
 #include "common/status.h"
-#include "util/priority_thread_pool.hpp"
+#include "util/work_thread_pool.hpp"
 
 namespace google {
 namespace protobuf {
@@ -227,8 +227,8 @@ private:
     // the reason see issue #16634
     // define the interface for reading and writing data as heavy interface
     // otherwise as light interface
-    PriorityThreadPool _heavy_work_pool;
-    PriorityThreadPool _light_work_pool;
+    FifoThreadPool _heavy_work_pool;
+    FifoThreadPool _light_work_pool;
 };
 
 } // namespace doris
diff --git a/be/src/util/async_io.h b/be/src/util/async_io.h
index fce50e79b7..9330b88d36 100644
--- a/be/src/util/async_io.h
+++ b/be/src/util/async_io.h
@@ -21,7 +21,7 @@
 
 #include "io/fs/file_system.h"
 #include "olap/olap_define.h"
-#include "priority_thread_pool.hpp"
+#include "work_thread_pool.hpp"
 
 namespace doris {
 
diff --git a/be/src/util/blocking_priority_queue.hpp 
b/be/src/util/blocking_priority_queue.hpp
index 41196c5cfb..92d6875135 100644
--- a/be/src/util/blocking_priority_queue.hpp
+++ b/be/src/util/blocking_priority_queue.hpp
@@ -185,7 +185,7 @@ public:
         return _queue.size();
     }
 
-    uint32_t get_max_size() const { return _max_element; }
+    uint32_t get_capacity() const { return _max_element; }
 
     // Returns the total amount of time threads have blocked in blocking_get.
     uint64_t total_get_wait_time() const { return _total_get_wait_time; }
diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp
index 35cd74ee89..0b66220a37 100644
--- a/be/src/util/blocking_queue.hpp
+++ b/be/src/util/blocking_queue.hpp
@@ -82,6 +82,26 @@ public:
         return true;
     }
 
+    // Return false if queue full or has been shutdown.
+    bool try_put(const T& val) {
+        if (_shutdown || _list.size() >= _max_elements) {
+            return false;
+        }
+
+        MonotonicStopWatch timer;
+        timer.start();
+        std::unique_lock<std::mutex> unique_lock(_lock);
+        _total_put_wait_time += timer.elapsed_time();
+
+        if (_shutdown || _list.size() >= _max_elements) {
+            return false;
+        }
+
+        _list.push_back(val);
+        _get_cv.notify_one();
+        return true;
+    }
+
     // Shut down the queue. Wakes up all threads waiting on BlockingGet or 
BlockingPut.
     void shutdown() {
         {
@@ -98,6 +118,8 @@ public:
         return _list.size();
     }
 
+    uint32_t get_capacity() const { return _max_elements; }
+
     // Returns the total amount of time threads have blocked in BlockingGet.
     uint64_t total_get_wait_time() const { return _total_get_wait_time; }
 
@@ -105,12 +127,6 @@ public:
     uint64_t total_put_wait_time() const { return _total_put_wait_time; }
 
 private:
-    uint32_t SizeLocked(const std::unique_lock<std::mutex>& lock) const {
-        // The size of 'get_list_' is read racily to avoid getting 'get_lock_' 
in write path.
-        DCHECK(lock.owns_lock());
-        return _list.size();
-    }
-
     bool _shutdown;
     const int _max_elements;
     std::condition_variable _get_cv; // 'get' callers wait on this
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index d1b744b63e..3bb0a76c1c 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -38,13 +38,13 @@
 #include <unordered_set>
 
 #include "common/status.h"
+#include "util/work_thread_pool.hpp"
 
 namespace doris {
 
 class Thread;
 class ThreadPool;
 class ThreadPoolToken;
-class PriorityThreadPool;
 
 class Runnable {
 public:
diff --git a/be/src/util/priority_thread_pool.hpp 
b/be/src/util/work_thread_pool.hpp
similarity index 88%
rename from be/src/util/priority_thread_pool.hpp
rename to be/src/util/work_thread_pool.hpp
index 8f648d9d37..109ae6ded8 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/work_thread_pool.hpp
@@ -21,6 +21,7 @@
 #include <thread>
 
 #include "util/blocking_priority_queue.hpp"
+#include "util/blocking_queue.hpp"
 #include "util/lock.h"
 #include "util/thread.h"
 #include "util/thread_group.h"
@@ -29,7 +30,8 @@ namespace doris {
 
 // Simple threadpool which processes items (of type T) in parallel which were 
placed on a
 // blocking queue by Offer(). Each item is processed by a single user-supplied 
method.
-class PriorityThreadPool {
+template <bool Priority = false>
+class WorkThreadPool {
 public:
     // Signature of a work-processing function. Takes the integer id of the 
thread which is
     // calling it (ids run from 0 to num_threads - 1) and a reference to the 
item to
@@ -49,22 +51,25 @@ public:
         }
     };
 
+    using WorkQueue =
+            std::conditional_t<Priority, BlockingPriorityQueue<Task>, 
BlockingQueue<Task>>;
+
     // Creates a new thread pool and start num_threads threads.
     //  -- num_threads: how many threads are part of this pool
     //  -- queue_size: the maximum size of the queue on which work items are 
offered. If the
     //     queue exceeds this size, subsequent calls to Offer will block until 
there is
     //     capacity available.
-    PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const 
std::string& name)
+    WorkThreadPool(uint32_t num_threads, uint32_t queue_size, const 
std::string& name)
             : _work_queue(queue_size), _shutdown(false), _name(name), 
_active_threads(0) {
         for (int i = 0; i < num_threads; ++i) {
             _threads.create_thread(
-                    
std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i));
+                    std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), 
this, i));
         }
     }
 
     // Destructor ensures that all threads are terminated before this object 
is freed
     // (otherwise they may continue to run and reference member variables)
-    virtual ~PriorityThreadPool() {
+    virtual ~WorkThreadPool() {
         shutdown();
         join();
     }
@@ -83,12 +88,12 @@ public:
     virtual bool offer(Task task) { return _work_queue.blocking_put(task); }
 
     virtual bool offer(WorkFunction func) {
-        PriorityThreadPool::Task task = {0, func, 0};
+        WorkThreadPool::Task task = {0, func, 0};
         return _work_queue.blocking_put(task);
     }
 
     virtual bool try_offer(WorkFunction func) {
-        PriorityThreadPool::Task task = {0, func, 0};
+        WorkThreadPool::Task task = {0, func, 0};
         return _work_queue.try_put(task);
     }
 
@@ -126,8 +131,8 @@ public:
         return fmt::format(
                 "PriorityThreadPool(name={}, queue_size={}/{}, 
active_thread={}/{}, "
                 "total_get_wait_time={}, total_put_wait_time={})",
-                _name, get_queue_size(), _work_queue.get_size(), 
_work_queue.get_max_size(),
-                _active_threads, _threads.size(), 
_work_queue.total_get_wait_time(),
+                _name, get_queue_size(), _work_queue.get_capacity(), 
_active_threads,
+                _threads.size(), _work_queue.total_get_wait_time(),
                 _work_queue.total_put_wait_time());
     }
 
@@ -161,9 +166,7 @@ private:
         }
     }
 
-    // Queue on which work items are held until a thread is available to 
process them in
-    // FIFO order.
-    BlockingPriorityQueue<Task> _work_queue;
+    WorkQueue _work_queue;
 
     // Set to true when threads should stop doing work and terminate.
     std::atomic<bool> _shutdown;
@@ -171,4 +174,7 @@ private:
     std::atomic<int> _active_threads;
 };
 
+using PriorityThreadPool = WorkThreadPool<true>;
+using FifoThreadPool = WorkThreadPool<false>;
+
 } // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index e72a01c952..bac67e72a9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -41,11 +41,11 @@
 #include "util/blocking_queue.hpp"
 #include "util/cpu_info.h"
 #include "util/defer_op.h"
-#include "util/priority_thread_pool.hpp"
 #include "util/priority_work_stealing_thread_pool.hpp"
 #include "util/runtime_profile.h"
 #include "util/thread.h"
 #include "util/threadpool.h"
+#include "util/work_thread_pool.hpp"
 #include "vec/core/block.h"
 #include "vec/exec/scan/new_olap_scanner.h" // IWYU pragma: keep
 #include "vec/exec/scan/scanner_context.h"
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index ccc809becd..e669fd9b77 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -27,7 +27,6 @@
 
 namespace doris {
 class ExecEnv;
-class PriorityThreadPool;
 
 namespace vectorized {
 class VScanner;
diff --git a/be/test/olap/skiplist_test.cpp b/be/test/olap/skiplist_test.cpp
index 667ad6ba4c..55c1b28bc5 100644
--- a/be/test/olap/skiplist_test.cpp
+++ b/be/test/olap/skiplist_test.cpp
@@ -33,8 +33,8 @@
 #include "gtest/gtest_pred_impl.h"
 #include "testutil/test_util.h"
 #include "util/hash_util.hpp"
-#include "util/priority_thread_pool.hpp"
 #include "util/random.h"
+#include "util/work_thread_pool.hpp"
 #include "vec/common/arena.h"
 
 namespace doris {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to