This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 78b05a1139e [feature](pipeline) local merge sort ready when all queue 
has data (#35992)
78b05a1139e is described below

commit 78b05a1139e9711f872695c9ed31b4fdc89a6c07
Author: Mryange <[email protected]>
AuthorDate: Wed Jun 12 19:44:08 2024 +0800

    [feature](pipeline) local merge sort ready when all queue has data (#35992)
    
     local merge sort ready when all queue has data
---
 .../local_exchange_sink_operator.cpp               |  9 ++++++++
 .../local_exchange/local_exchange_sink_operator.h  |  8 +------
 .../local_exchange_source_operator.cpp             |  9 ++++++++
 .../local_exchange_source_operator.h               |  2 ++
 be/src/pipeline/local_exchange/local_exchanger.cpp | 25 ++++++++++++++++++++++
 be/src/pipeline/local_exchange/local_exchanger.h   | 19 ++++++++++------
 6 files changed, 59 insertions(+), 13 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index a310b921b18..ba51a2da39b 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -25,6 +25,15 @@ namespace doris::pipeline {
 
 LocalExchangeSinkLocalState::~LocalExchangeSinkLocalState() = default;
 
+std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const {
+    auto deps = Base::dependencies();
+    auto exchanger_deps = _exchanger->local_sink_state_dependency(_channel_id);
+    for (auto* dep : exchanger_deps) {
+        deps.push_back(dep);
+    }
+    return deps;
+}
+
 Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int 
num_buckets,
                                         const bool is_shuffled_hash_join,
                                         const std::map<int, int>& 
shuffle_idx_to_instance_idx) {
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index c29d6a7ec90..0ff1df26001 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -45,13 +45,7 @@ public:
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
     std::string debug_string(int indentation_level) const override;
-    std::vector<Dependency*> dependencies() const override {
-        auto deps = Base::dependencies();
-        if (auto local_state_sink_dep = 
_exchanger->get_local_state_dependency(_channel_id)) {
-            deps.push_back(local_state_sink_dep.get());
-        }
-        return deps;
-    }
+    std::vector<Dependency*> dependencies() const override;
 
 private:
     friend class LocalExchangeSinkOperatorX;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 79aba88fbaa..56f0a157cde 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -62,6 +62,15 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* 
state) {
     return Base::close(state);
 }
 
+std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const {
+    auto deps = Base::dependencies();
+    auto exchanger_deps = _exchanger->local_state_dependency(_channel_id);
+    for (auto* dep : exchanger_deps) {
+        deps.push_back(dep);
+    }
+    return deps;
+}
+
 std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) 
const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index 58086097d6d..f9fa4cfa4ed 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -40,6 +40,8 @@ public:
     Status close(RuntimeState* state) override;
     std::string debug_string(int indentation_level) const override;
 
+    std::vector<Dependency*> dependencies() const override;
+
 private:
     friend class LocalExchangeSourceOperatorX;
     friend class ExchangerBase;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 7b9186cb7c8..466ca860398 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -288,9 +288,13 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, 
vectorized::Block* in_
     new_block.swap(*in_block);
     DCHECK_LE(local_state._channel_id, _data_queue.size());
     add_mem_usage(local_state, new_block.allocated_bytes());
+
     if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
         local_state._shared_state->set_ready_to_read(0);
     }
+    if (eos) {
+        _queue_deps[local_state._channel_id]->set_always_ready();
+    }
     return Status::OK();
 }
 
@@ -361,6 +365,7 @@ void 
LocalMergeSortExchanger::add_mem_usage(LocalExchangeSinkLocalState& local_s
     if (_queues_mem_usege[channel_id].fetch_add(delta) > _each_queue_limit) {
         _sink_deps[channel_id]->block();
     }
+    _queue_deps[channel_id]->set_ready();
 }
 
 void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSourceLocalState& 
local_state,
@@ -369,6 +374,26 @@ void 
LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSourceLocalState& local
     if (_queues_mem_usege[channel_id].fetch_sub(delta) <= _each_queue_limit) {
         _sink_deps[channel_id]->set_ready();
     }
+    // if queue empty , block this queue
+    if (_queues_mem_usege[channel_id] == 0) {
+        _queue_deps[channel_id]->block();
+    }
+}
+
+std::vector<Dependency*> 
LocalMergeSortExchanger::local_sink_state_dependency(int channel_id) {
+    DCHECK(_sink_deps[channel_id]);
+    return {_sink_deps[channel_id].get()};
+}
+
+std::vector<Dependency*> LocalMergeSortExchanger::local_state_dependency(int 
channel_id) {
+    if (channel_id != 0) {
+        return {};
+    }
+    std::vector<Dependency*> deps;
+    for (auto depSptr : _queue_deps) {
+        deps.push_back(depSptr.get());
+    }
+    return deps;
 }
 
 Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 113f4906ff8..741b86aa8bb 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -52,7 +52,8 @@ public:
     virtual ExchangeType get_type() const = 0;
     virtual void close(LocalExchangeSourceLocalState& local_state) = 0;
 
-    virtual DependencySPtr get_local_state_dependency(int _channel_id) { 
return nullptr; }
+    virtual std::vector<Dependency*> local_sink_state_dependency(int 
channel_id) { return {}; }
+    virtual std::vector<Dependency*> local_state_dependency(int channel_id) { 
return {}; }
 
 protected:
     friend struct LocalExchangeSharedState;
@@ -234,7 +235,10 @@ public:
         for (size_t i = 0; i < num_partitions; i++) {
             _queues_mem_usege[i] = 0;
             _sink_deps.push_back(
-                    std::make_shared<Dependency>(0, 0, 
"LOCAL_MERGE_SORT_DEPENDENCY", true));
+                    std::make_shared<Dependency>(0, 0, 
"LOCAL_MERGE_SORT_SINK_DEPENDENCY", true));
+            _queue_deps.push_back(
+                    std::make_shared<Dependency>(0, 0, 
"LOCAL_MERGE_SORT_QUEUE_DEPENDENCY"));
+            _queue_deps.back()->block();
         }
     }
     ~LocalMergeSortExchanger() override = default;
@@ -247,20 +251,23 @@ public:
 
     Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState& 
local_state);
 
-    DependencySPtr get_local_state_dependency(int channel_id) override {
-        DCHECK(_sink_deps[channel_id]);
-        return _sink_deps[channel_id];
-    }
+    std::vector<Dependency*> local_sink_state_dependency(int channel_id) 
override;
+
+    std::vector<Dependency*> local_state_dependency(int channel_id) override;
 
     void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t 
delta);
     void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int 
channel_id, int64_t delta);
     void close(LocalExchangeSourceLocalState& local_state) override {}
 
 private:
+    // only channel_id = 0 , build _merger and use it
+
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
     std::shared_ptr<SortSourceOperatorX> _sort_source;
     std::vector<DependencySPtr> _sink_deps;
     std::vector<std::atomic_int64_t> _queues_mem_usege;
+    // if cur queue is empty, block this queue
+    std::vector<DependencySPtr> _queue_deps;
     const int64_t _each_queue_limit;
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to