This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 327c545949b branch-3.0: [fix](blocking queue) introduce condition 
variable wait timeout to avoid blocking queue deadlock #50906 (#51596)
327c545949b is described below

commit 327c545949b0c1f076cca32a80891c495064004b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 11 20:58:13 2025 +0800

    branch-3.0: [fix](blocking queue) introduce condition variable wait timeout 
to avoid blocking queue deadlock #50906 (#51596)
    
    Cherry-picked from #50906
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/common/config.cpp                           |  3 +++
 be/src/common/config.h                             |  3 +++
 be/src/runtime/routine_load/data_consumer.cpp      |  6 ++++--
 .../runtime/routine_load/data_consumer_group.cpp   |  2 +-
 be/src/util/blocking_queue.hpp                     | 25 ++++++++++++++++++----
 5 files changed, 32 insertions(+), 7 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 2fc927dcbfd..04e04aa4b4b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -720,6 +720,9 @@ DEFINE_mInt32(max_consumer_num_per_group, "3");
 // this should be larger than FE config 'max_routine_load_task_num_per_be' 
(default 5)
 DEFINE_Int32(max_routine_load_thread_pool_size, "1024");
 
+// the timeout of condition variable wait in blocking_get and blocking_put
+DEFINE_mInt32(blocking_queue_cv_wait_timeout_ms, "1000");
+
 // max external scan cache batch count, means cache 
max_memory_cache_batch_count * batch_size row
 // default is 20, batch_size's default value is 1024 means 20 * 1024 rows will 
be cached
 DEFINE_mInt32(max_memory_sink_batch_count, "20");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c342b260288..51a275a55b8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -965,6 +965,9 @@ DECLARE_mString(kafka_debug);
 // Change this size to 0 to fix it temporarily.
 DECLARE_mInt32(routine_load_consumer_pool_size);
 
+// the timeout of condition variable wait in blocking_get and blocking_put
+DECLARE_mInt32(blocking_queue_cv_wait_timeout_ms);
+
 // Used in single-stream-multi-table load. When receive a batch of messages 
from kafka,
 // if the size of batch is more than this threshold, we will request plans for 
all related tables.
 DECLARE_Int32(multi_table_batch_plan_threshold);
diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index 00f9c726b41..aece4a791f0 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -243,7 +243,8 @@ Status 
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
                 // ignore msg with length 0.
                 // put empty msg into queue will cause the load process 
shutting down.
                 break;
-            } else if (!queue->blocking_put(msg.get())) {
+            } else if (!queue->controlled_blocking_put(msg.get(),
+                                                       
config::blocking_queue_cv_wait_timeout_ms)) {
                 // queue is shutdown
                 done = true;
             } else {
@@ -270,7 +271,8 @@ Status 
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
             VLOG_NOTICE << "consumer meet partition eof: " << _id
                         << " partition offset: " << msg->offset();
             _consuming_partition_ids.erase(msg->partition());
-            if (!queue->blocking_put(msg.get())) {
+            if (!queue->controlled_blocking_put(msg.get(),
+                                                
config::blocking_queue_cv_wait_timeout_ms)) {
                 done = true;
             } else if (_consuming_partition_ids.size() <= 0) {
                 LOG(INFO) << "all partitions meet eof: " << _id;
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp 
b/be/src/runtime/routine_load/data_consumer_group.cpp
index fc714fab6e0..feef4c7a9fb 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -156,7 +156,7 @@ Status 
KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
         }
 
         RdKafka::Message* msg;
-        bool res = _queue.blocking_get(&msg);
+        bool res = _queue.controlled_blocking_get(&msg, 
config::blocking_queue_cv_wait_timeout_ms);
         if (res) {
             // conf has to be deleted finally
             Defer delete_msg {[msg]() { delete msg; }};
diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp
index 6f325b7d6ab..cacf3b6decf 100644
--- a/be/src/util/blocking_queue.hpp
+++ b/be/src/util/blocking_queue.hpp
@@ -48,13 +48,21 @@ public:
     // Get an element from the queue, waiting indefinitely for one to become 
available.
     // Returns false if we were shut down prior to getting the element, and 
there
     // are no more elements available.
-    bool blocking_get(T* out) {
+    bool blocking_get(T* out) { return controlled_blocking_get(out, 
MAX_CV_WAIT_TIMEOUT_MS); }
+
+    // Blocking_get and blocking_put may cause deadlock,
+    // but we still don't find root cause,
+    // introduce condition variable wait timeout to avoid blocking queue 
deadlock temporarily.
+    bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
         MonotonicStopWatch timer;
         timer.start();
         std::unique_lock<std::mutex> unique_lock(_lock);
         while (!(_shutdown || !_list.empty())) {
             ++_get_waiting;
-            _get_cv.wait(unique_lock);
+            if (_get_cv.wait_for(unique_lock, 
std::chrono::milliseconds(cv_wait_timeout_ms)) ==
+                std::cv_status::timeout) {
+                _get_waiting--;
+            }
         }
         _total_get_wait_time += timer.elapsed_time();
 
@@ -75,13 +83,21 @@ public:
 
     // Puts an element into the queue, waiting indefinitely until there is 
space.
     // If the queue is shut down, returns false.
-    bool blocking_put(const T& val) {
+    bool blocking_put(const T& val) { return controlled_blocking_put(val, 
MAX_CV_WAIT_TIMEOUT_MS); }
+
+    // Blocking_get and blocking_put may cause deadlock,
+    // but we still don't find root cause,
+    // introduce condition variable wait timeout to avoid blocking queue 
deadlock temporarily.
+    bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) {
         MonotonicStopWatch timer;
         timer.start();
         std::unique_lock<std::mutex> unique_lock(_lock);
         while (!(_shutdown || _list.size() < _max_elements)) {
             ++_put_waiting;
-            _put_cv.wait(unique_lock);
+            if (_put_cv.wait_for(unique_lock, 
std::chrono::milliseconds(cv_wait_timeout_ms)) ==
+                std::cv_status::timeout) {
+                _put_waiting--;
+            }
         }
         _total_put_wait_time += timer.elapsed_time();
 
@@ -147,6 +163,7 @@ public:
     uint64_t total_put_wait_time() const { return _total_put_wait_time; }
 
 private:
+    static constexpr int64_t MAX_CV_WAIT_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour
     bool _shutdown;
     const int _max_elements;
     std::condition_variable _get_cv; // 'get' callers wait on this


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to