This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 78b05a1139e [feature](pipeline) local merge sort ready when all queue
has data (#35992)
78b05a1139e is described below
commit 78b05a1139e9711f872695c9ed31b4fdc89a6c07
Author: Mryange <[email protected]>
AuthorDate: Wed Jun 12 19:44:08 2024 +0800
[feature](pipeline) local merge sort ready when all queue has data (#35992)
local merge sort ready when all queue has data
---
.../local_exchange_sink_operator.cpp | 9 ++++++++
.../local_exchange/local_exchange_sink_operator.h | 8 +------
.../local_exchange_source_operator.cpp | 9 ++++++++
.../local_exchange_source_operator.h | 2 ++
be/src/pipeline/local_exchange/local_exchanger.cpp | 25 ++++++++++++++++++++++
be/src/pipeline/local_exchange/local_exchanger.h | 19 ++++++++++------
6 files changed, 59 insertions(+), 13 deletions(-)
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index a310b921b18..ba51a2da39b 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -25,6 +25,15 @@ namespace doris::pipeline {
LocalExchangeSinkLocalState::~LocalExchangeSinkLocalState() = default;
+std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const {
+ auto deps = Base::dependencies();
+ auto exchanger_deps = _exchanger->local_sink_state_dependency(_channel_id);
+ for (auto* dep : exchanger_deps) {
+ deps.push_back(dep);
+ }
+ return deps;
+}
+
Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int
num_buckets,
const bool is_shuffled_hash_join,
const std::map<int, int>&
shuffle_idx_to_instance_idx) {
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index c29d6a7ec90..0ff1df26001 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -45,13 +45,7 @@ public:
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
std::string debug_string(int indentation_level) const override;
- std::vector<Dependency*> dependencies() const override {
- auto deps = Base::dependencies();
- if (auto local_state_sink_dep =
_exchanger->get_local_state_dependency(_channel_id)) {
- deps.push_back(local_state_sink_dep.get());
- }
- return deps;
- }
+ std::vector<Dependency*> dependencies() const override;
private:
friend class LocalExchangeSinkOperatorX;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 79aba88fbaa..56f0a157cde 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -62,6 +62,15 @@ Status LocalExchangeSourceLocalState::close(RuntimeState*
state) {
return Base::close(state);
}
+std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const {
+ auto deps = Base::dependencies();
+ auto exchanger_deps = _exchanger->local_state_dependency(_channel_id);
+ for (auto* dep : exchanger_deps) {
+ deps.push_back(dep);
+ }
+ return deps;
+}
+
std::string LocalExchangeSourceLocalState::debug_string(int indentation_level)
const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index 58086097d6d..f9fa4cfa4ed 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -40,6 +40,8 @@ public:
Status close(RuntimeState* state) override;
std::string debug_string(int indentation_level) const override;
+ std::vector<Dependency*> dependencies() const override;
+
private:
friend class LocalExchangeSourceOperatorX;
friend class ExchangerBase;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 7b9186cb7c8..466ca860398 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -288,9 +288,13 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state,
vectorized::Block* in_
new_block.swap(*in_block);
DCHECK_LE(local_state._channel_id, _data_queue.size());
add_mem_usage(local_state, new_block.allocated_bytes());
+
if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(0);
}
+ if (eos) {
+ _queue_deps[local_state._channel_id]->set_always_ready();
+ }
return Status::OK();
}
@@ -361,6 +365,7 @@ void
LocalMergeSortExchanger::add_mem_usage(LocalExchangeSinkLocalState& local_s
if (_queues_mem_usege[channel_id].fetch_add(delta) > _each_queue_limit) {
_sink_deps[channel_id]->block();
}
+ _queue_deps[channel_id]->set_ready();
}
void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSourceLocalState&
local_state,
@@ -369,6 +374,26 @@ void
LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSourceLocalState& local
if (_queues_mem_usege[channel_id].fetch_sub(delta) <= _each_queue_limit) {
_sink_deps[channel_id]->set_ready();
}
+ // if queue empty , block this queue
+ if (_queues_mem_usege[channel_id] == 0) {
+ _queue_deps[channel_id]->block();
+ }
+}
+
+std::vector<Dependency*>
LocalMergeSortExchanger::local_sink_state_dependency(int channel_id) {
+ DCHECK(_sink_deps[channel_id]);
+ return {_sink_deps[channel_id].get()};
+}
+
+std::vector<Dependency*> LocalMergeSortExchanger::local_state_dependency(int
channel_id) {
+ if (channel_id != 0) {
+ return {};
+ }
+ std::vector<Dependency*> deps;
+ for (auto depSptr : _queue_deps) {
+ deps.push_back(depSptr.get());
+ }
+ return deps;
}
Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block*
in_block, bool eos,
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index 113f4906ff8..741b86aa8bb 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -52,7 +52,8 @@ public:
virtual ExchangeType get_type() const = 0;
virtual void close(LocalExchangeSourceLocalState& local_state) = 0;
- virtual DependencySPtr get_local_state_dependency(int _channel_id) {
return nullptr; }
+ virtual std::vector<Dependency*> local_sink_state_dependency(int
channel_id) { return {}; }
+ virtual std::vector<Dependency*> local_state_dependency(int channel_id) {
return {}; }
protected:
friend struct LocalExchangeSharedState;
@@ -234,7 +235,10 @@ public:
for (size_t i = 0; i < num_partitions; i++) {
_queues_mem_usege[i] = 0;
_sink_deps.push_back(
- std::make_shared<Dependency>(0, 0,
"LOCAL_MERGE_SORT_DEPENDENCY", true));
+ std::make_shared<Dependency>(0, 0,
"LOCAL_MERGE_SORT_SINK_DEPENDENCY", true));
+ _queue_deps.push_back(
+ std::make_shared<Dependency>(0, 0,
"LOCAL_MERGE_SORT_QUEUE_DEPENDENCY"));
+ _queue_deps.back()->block();
}
}
~LocalMergeSortExchanger() override = default;
@@ -247,20 +251,23 @@ public:
Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState&
local_state);
- DependencySPtr get_local_state_dependency(int channel_id) override {
- DCHECK(_sink_deps[channel_id]);
- return _sink_deps[channel_id];
- }
+ std::vector<Dependency*> local_sink_state_dependency(int channel_id)
override;
+
+ std::vector<Dependency*> local_state_dependency(int channel_id) override;
void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t
delta);
void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int
channel_id, int64_t delta);
void close(LocalExchangeSourceLocalState& local_state) override {}
private:
+ // only channel_id = 0 , build _merger and use it
+
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
std::shared_ptr<SortSourceOperatorX> _sort_source;
std::vector<DependencySPtr> _sink_deps;
std::vector<std::atomic_int64_t> _queues_mem_usege;
+ // if cur queue is empty, block this queue
+ std::vector<DependencySPtr> _queue_deps;
const int64_t _each_queue_limit;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]