This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 23941ef3054 [improve](pipelineX) improve partition node dependency
logical (#28399)
23941ef3054 is described below
commit 23941ef30542aa37c51bf430b44236978671e13b
Author: zhangstar333 <[email protected]>
AuthorDate: Thu Dec 14 18:05:39 2023 +0800
[improve](pipelineX) improve partition node dependency logical (#28399)
---
.../pipeline/exec/partition_sort_sink_operator.cpp | 6 +++-
.../exec/partition_sort_source_operator.cpp | 12 ++++++--
.../pipeline/exec/partition_sort_source_operator.h | 34 ----------------------
be/src/pipeline/pipeline_x/dependency.h | 2 ++
4 files changed, 16 insertions(+), 38 deletions(-)
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 327d186d77b..478a3eedb20 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -151,7 +151,11 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
COUNTER_SET(local_state._hash_table_size_counter,
int64_t(local_state._num_partition));
//so all data from child have sink completed
-
((PartitionSortSourceDependency*)local_state._shared_state->source_dep)->set_always_ready();
+ {
+ std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
+ local_state._shared_state->sink_eos = true;
+ local_state._dependency->set_ready_to_read();
+ }
}
return Status::OK();
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 56db3852bc2..83da2f84112 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -49,11 +49,17 @@ Status
PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
if (local_state._shared_state->blocks_buffer.empty() == false) {
local_state._shared_state->blocks_buffer.front().swap(*output_block);
local_state._shared_state->blocks_buffer.pop();
- //if buffer have no data, block reading and wait for signal again
+ //if buffer have no data and sink not eos, block reading and wait
for signal again
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, output_block,
output_block->columns()));
- if (local_state._shared_state->blocks_buffer.empty()) {
- local_state._dependency->block();
+ if (local_state._shared_state->blocks_buffer.empty() &&
+ local_state._shared_state->sink_eos == false) {
+ // add this mutex to check, as in some case maybe is doing
block(), and the sink is doing set eos.
+ // so have to hold mutex to set block(), avoid to sink have
set eos and set ready, but here set block() by mistake
+ std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
+ if (local_state._shared_state->sink_eos == false) {
+ local_state._dependency->block();
+ }
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h
b/be/src/pipeline/exec/partition_sort_source_operator.h
index 4d9c3f46951..0b955645532 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -55,40 +55,6 @@ public:
PartitionSortSourceDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "PartitionSortSourceDependency",
query_ctx) {}
~PartitionSortSourceDependency() override = default;
-
- void block() override {
- if (_always_ready) {
- return;
- }
- std::unique_lock<std::mutex> lc(_always_done_lock);
- if (_always_ready) {
- return;
- }
- Dependency::block();
- }
-
- void set_always_ready() {
- if (_always_ready) {
- return;
- }
- std::unique_lock<std::mutex> lc(_always_done_lock);
- if (_always_ready) {
- return;
- }
- _always_ready = true;
- set_ready();
- }
-
- std::string debug_string(int indentation_level = 0) override {
- fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}, _always_ready = {}",
- Dependency::debug_string(indentation_level),
_always_ready);
- return fmt::to_string(debug_string_buffer);
- }
-
-private:
- bool _always_ready {false};
- std::mutex _always_done_lock;
};
class PartitionSortSourceOperatorX;
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 9e71774a004..952ee5b5234 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -472,6 +472,8 @@ public:
std::mutex buffer_mutex;
std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts;
std::unique_ptr<vectorized::SortCursorCmp> previous_row;
+ bool sink_eos = false;
+ std::mutex sink_eos_lock;
};
class AsyncWriterDependency final : public Dependency {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]