This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 327c545949b branch-3.0: [fix](blocking queue) introduce condition
variable wait timeout to avoid blocking queue deadlock #50906 (#51596)
327c545949b is described below
commit 327c545949b0c1f076cca32a80891c495064004b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 11 20:58:13 2025 +0800
branch-3.0: [fix](blocking queue) introduce condition variable wait timeout
to avoid blocking queue deadlock #50906 (#51596)
Cherry-picked from #50906
Co-authored-by: hui lai <[email protected]>
---
be/src/common/config.cpp | 3 +++
be/src/common/config.h | 3 +++
be/src/runtime/routine_load/data_consumer.cpp | 6 ++++--
.../runtime/routine_load/data_consumer_group.cpp | 2 +-
be/src/util/blocking_queue.hpp | 25 ++++++++++++++++++----
5 files changed, 32 insertions(+), 7 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 2fc927dcbfd..04e04aa4b4b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -720,6 +720,9 @@ DEFINE_mInt32(max_consumer_num_per_group, "3");
// this should be larger than FE config 'max_routine_load_task_num_per_be'
(default 5)
DEFINE_Int32(max_routine_load_thread_pool_size, "1024");
+// the timeout of condition variable wait in blocking_get and blocking_put
+DEFINE_mInt32(blocking_queue_cv_wait_timeout_ms, "1000");
+
// max external scan cache batch count, means cache
max_memory_cache_batch_count * batch_size row
// default is 20, batch_size's default value is 1024 means 20 * 1024 rows will
be cached
DEFINE_mInt32(max_memory_sink_batch_count, "20");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c342b260288..51a275a55b8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -965,6 +965,9 @@ DECLARE_mString(kafka_debug);
// Change this size to 0 to fix it temporarily.
DECLARE_mInt32(routine_load_consumer_pool_size);
+// the timeout of condition variable wait in blocking_get and blocking_put
+DECLARE_mInt32(blocking_queue_cv_wait_timeout_ms);
+
// Used in single-stream-multi-table load. When receive a batch of messages
from kafka,
// if the size of batch is more than this threshold, we will request plans for
all related tables.
DECLARE_Int32(multi_table_batch_plan_threshold);
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index 00f9c726b41..aece4a791f0 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -243,7 +243,8 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
// ignore msg with length 0.
// put empty msg into queue will cause the load process
shutting down.
break;
- } else if (!queue->blocking_put(msg.get())) {
+ } else if (!queue->controlled_blocking_put(msg.get(),
+
config::blocking_queue_cv_wait_timeout_ms)) {
// queue is shutdown
done = true;
} else {
@@ -270,7 +271,8 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
VLOG_NOTICE << "consumer meet partition eof: " << _id
<< " partition offset: " << msg->offset();
_consuming_partition_ids.erase(msg->partition());
- if (!queue->blocking_put(msg.get())) {
+ if (!queue->controlled_blocking_put(msg.get(),
+
config::blocking_queue_cv_wait_timeout_ms)) {
done = true;
} else if (_consuming_partition_ids.size() <= 0) {
LOG(INFO) << "all partitions meet eof: " << _id;
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp
b/be/src/runtime/routine_load/data_consumer_group.cpp
index fc714fab6e0..feef4c7a9fb 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -156,7 +156,7 @@ Status
KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
}
RdKafka::Message* msg;
- bool res = _queue.blocking_get(&msg);
+ bool res = _queue.controlled_blocking_get(&msg,
config::blocking_queue_cv_wait_timeout_ms);
if (res) {
// conf has to be deleted finally
Defer delete_msg {[msg]() { delete msg; }};
diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp
index 6f325b7d6ab..cacf3b6decf 100644
--- a/be/src/util/blocking_queue.hpp
+++ b/be/src/util/blocking_queue.hpp
@@ -48,13 +48,21 @@ public:
// Get an element from the queue, waiting indefinitely for one to become
available.
// Returns false if we were shut down prior to getting the element, and
there
// are no more elements available.
- bool blocking_get(T* out) {
+ bool blocking_get(T* out) { return controlled_blocking_get(out,
MAX_CV_WAIT_TIMEOUT_MS); }
+
+ // Blocking_get and blocking_put may cause deadlock,
+ // but we still don't find root cause,
+ // introduce condition variable wait timeout to avoid blocking queue
deadlock temporarily.
+ bool controlled_blocking_get(T* out, int64_t cv_wait_timeout_ms) {
MonotonicStopWatch timer;
timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
while (!(_shutdown || !_list.empty())) {
++_get_waiting;
- _get_cv.wait(unique_lock);
+ if (_get_cv.wait_for(unique_lock,
std::chrono::milliseconds(cv_wait_timeout_ms)) ==
+ std::cv_status::timeout) {
+ _get_waiting--;
+ }
}
_total_get_wait_time += timer.elapsed_time();
@@ -75,13 +83,21 @@ public:
// Puts an element into the queue, waiting indefinitely until there is
space.
// If the queue is shut down, returns false.
- bool blocking_put(const T& val) {
+ bool blocking_put(const T& val) { return controlled_blocking_put(val,
MAX_CV_WAIT_TIMEOUT_MS); }
+
+ // Blocking_get and blocking_put may cause deadlock,
+ // but we still don't find root cause,
+ // introduce condition variable wait timeout to avoid blocking queue
deadlock temporarily.
+ bool controlled_blocking_put(const T& val, int64_t cv_wait_timeout_ms) {
MonotonicStopWatch timer;
timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
while (!(_shutdown || _list.size() < _max_elements)) {
++_put_waiting;
- _put_cv.wait(unique_lock);
+ if (_put_cv.wait_for(unique_lock,
std::chrono::milliseconds(cv_wait_timeout_ms)) ==
+ std::cv_status::timeout) {
+ _put_waiting--;
+ }
}
_total_put_wait_time += timer.elapsed_time();
@@ -147,6 +163,7 @@ public:
uint64_t total_put_wait_time() const { return _total_put_wait_time; }
private:
+ static constexpr int64_t MAX_CV_WAIT_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour
bool _shutdown;
const int _max_elements;
std::condition_variable _get_cv; // 'get' callers wait on this
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]