This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 9bb29ce164a [fix](DataQueue) Fix thread conflict issue caused by
concurrent calls to DataQueue::remaining_has_data (#46094)
9bb29ce164a is described below
commit 9bb29ce164aa4895e231e51a91ab117025505d6a
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Dec 27 15:28:04 2024 +0800
[fix](DataQueue) Fix thread conflict issue caused by concurrent calls to
DataQueue::remaining_has_data (#46094)
---
be/src/pipeline/exec/cache_source_operator.cpp | 2 +-
be/src/pipeline/exec/data_queue.cpp | 11 -----------
be/src/pipeline/exec/data_queue.h | 7 ++++---
be/src/pipeline/exec/union_source_operator.cpp | 2 +-
4 files changed, 6 insertions(+), 16 deletions(-)
diff --git a/be/src/pipeline/exec/cache_source_operator.cpp
b/be/src/pipeline/exec/cache_source_operator.cpp
index cace8465fc2..b515aeb4957 100644
--- a/be/src/pipeline/exec/cache_source_operator.cpp
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -111,7 +111,7 @@ std::string CacheSourceLocalState::debug_string(int
indentation_level) const {
if (_shared_state) {
fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish =
{}, has_data = {})",
_shared_state->data_queue.is_all_finish(),
- _shared_state->data_queue.remaining_has_data());
+ _shared_state->data_queue.has_more_data());
}
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/exec/data_queue.cpp
b/be/src/pipeline/exec/data_queue.cpp
index 436a98e6b03..85354ece76a 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -72,17 +72,6 @@ void
DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int ch
_free_blocks[child_idx].emplace_back(std::move(block));
}
-//use sink to check can_write
-bool DataQueue::has_enough_space_to_push() {
- DCHECK(_cur_bytes_in_queue.size() == 1);
- return _cur_bytes_in_queue[0].load() < MAX_BYTE_OF_QUEUE / 2;
-}
-
-//use source to check can_read
-bool DataQueue::has_data_or_finished(int child_idx) {
- return remaining_has_data() || _is_finished[child_idx];
-}
-
//check which queue have data, and save the idx in _flag_queue_idx,
//so next loop, will check the record idx + 1 first
//maybe it's useful with many queue, others maybe always 0
diff --git a/be/src/pipeline/exec/data_queue.h
b/be/src/pipeline/exec/data_queue.h
index d8a22888d9b..f2a849ee7dd 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -52,10 +52,11 @@ public:
bool is_finish(int child_idx = 0);
bool is_all_finish();
- bool has_enough_space_to_push();
- bool has_data_or_finished(int child_idx = 0);
+ // This function is not thread safe, should be called in
Operator::get_block()
bool remaining_has_data();
+ bool has_more_data() const { return _cur_blocks_total_nums.load() > 0; }
+
int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; }
int64_t max_size_of_queue() const { return _max_size_of_queue; }
@@ -102,7 +103,7 @@ private:
//this only use to record the queue[0] for profile
int64_t _max_bytes_in_queue = 0;
int64_t _max_size_of_queue = 0;
- static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;
+ static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024L * 1024 * 1024 / 10;
// data queue is multi sink one source
std::shared_ptr<Dependency> _source_dependency = nullptr;
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index 942135453b4..f43cd604b68 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -90,7 +90,7 @@ std::string UnionSourceLocalState::debug_string(int
indentation_level) const {
if (_shared_state) {
fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish =
{}, has_data = {})",
_shared_state->data_queue.is_all_finish(),
- _shared_state->data_queue.remaining_has_data());
+ _shared_state->data_queue.has_more_data());
}
return fmt::to_string(debug_string_buffer);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]