This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 23941ef3054 [improve](pipelineX) improve partition node dependency 
logical (#28399)
23941ef3054 is described below

commit 23941ef30542aa37c51bf430b44236978671e13b
Author: zhangstar333 <[email protected]>
AuthorDate: Thu Dec 14 18:05:39 2023 +0800

    [improve](pipelineX) improve partition node dependency logical (#28399)
---
 .../pipeline/exec/partition_sort_sink_operator.cpp |  6 +++-
 .../exec/partition_sort_source_operator.cpp        | 12 ++++++--
 .../pipeline/exec/partition_sort_source_operator.h | 34 ----------------------
 be/src/pipeline/pipeline_x/dependency.h            |  2 ++
 4 files changed, 16 insertions(+), 38 deletions(-)

diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 327d186d77b..478a3eedb20 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -151,7 +151,11 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
         COUNTER_SET(local_state._hash_table_size_counter, 
int64_t(local_state._num_partition));
         //so all data from child have sink completed
-        
((PartitionSortSourceDependency*)local_state._shared_state->source_dep)->set_always_ready();
+        {
+            std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
+            local_state._shared_state->sink_eos = true;
+            local_state._dependency->set_ready_to_read();
+        }
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp 
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 56db3852bc2..83da2f84112 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -49,11 +49,17 @@ Status 
PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
         if (local_state._shared_state->blocks_buffer.empty() == false) {
             
local_state._shared_state->blocks_buffer.front().swap(*output_block);
             local_state._shared_state->blocks_buffer.pop();
-            //if buffer have no data, block reading and wait for signal again
+            //if buffer have no data and sink not eos, block reading and wait 
for signal again
             RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
                     local_state._conjuncts, output_block, 
output_block->columns()));
-            if (local_state._shared_state->blocks_buffer.empty()) {
-                local_state._dependency->block();
+            if (local_state._shared_state->blocks_buffer.empty() &&
+                local_state._shared_state->sink_eos == false) {
+                // add this mutex to check, as in some case maybe is doing 
block(), and the sink is doing set eos.
+                // so have to hold mutex to set block(), avoid to sink have 
set eos and set ready, but here set block() by mistake
+                std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
+                if (local_state._shared_state->sink_eos == false) {
+                    local_state._dependency->block();
+                }
             }
             return Status::OK();
         }
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h 
b/be/src/pipeline/exec/partition_sort_source_operator.h
index 4d9c3f46951..0b955645532 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -55,40 +55,6 @@ public:
     PartitionSortSourceDependency(int id, int node_id, QueryContext* query_ctx)
             : Dependency(id, node_id, "PartitionSortSourceDependency", 
query_ctx) {}
     ~PartitionSortSourceDependency() override = default;
-
-    void block() override {
-        if (_always_ready) {
-            return;
-        }
-        std::unique_lock<std::mutex> lc(_always_done_lock);
-        if (_always_ready) {
-            return;
-        }
-        Dependency::block();
-    }
-
-    void set_always_ready() {
-        if (_always_ready) {
-            return;
-        }
-        std::unique_lock<std::mutex> lc(_always_done_lock);
-        if (_always_ready) {
-            return;
-        }
-        _always_ready = true;
-        set_ready();
-    }
-
-    std::string debug_string(int indentation_level = 0) override {
-        fmt::memory_buffer debug_string_buffer;
-        fmt::format_to(debug_string_buffer, "{}, _always_ready = {}",
-                       Dependency::debug_string(indentation_level), 
_always_ready);
-        return fmt::to_string(debug_string_buffer);
-    }
-
-private:
-    bool _always_ready {false};
-    std::mutex _always_done_lock;
 };
 
 class PartitionSortSourceOperatorX;
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 9e71774a004..952ee5b5234 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -472,6 +472,8 @@ public:
     std::mutex buffer_mutex;
     std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts;
     std::unique_ptr<vectorized::SortCursorCmp> previous_row;
+    bool sink_eos = false;
+    std::mutex sink_eos_lock;
 };
 
 class AsyncWriterDependency final : public Dependency {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to