This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e326ebb63e4 [feature](pipelineX) control exchange sink by memory usage 
(#28814)
e326ebb63e4 is described below

commit e326ebb63e4e07d8ee6595561ab19dc5d411f592
Author: Mryange <[email protected]>
AuthorDate: Mon Dec 25 10:31:50 2023 +0800

    [feature](pipelineX) control exchange sink by memory usage (#28814)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp   |  7 ++++++-
 be/src/pipeline/exec/exchange_sink_operator.h     |  7 ++++++-
 be/src/pipeline/exec/exchange_source_operator.cpp |  1 +
 be/src/vec/runtime/vdata_stream_recvr.cpp         | 16 ++++++++++++----
 be/src/vec/runtime/vdata_stream_recvr.h           |  9 ++++++++-
 be/src/vec/sink/vdata_stream_sender.h             |  5 +++++
 6 files changed, 38 insertions(+), 7 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 43bec0bd92d..9f9b36d1cb2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -199,17 +199,22 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         _wait_channel_timer.resize(local_size);
         auto deps_for_channels = AndDependency::create_shared(
                 _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
-        for (auto channel : channels) {
+        auto deps_for_channels_mem_limit = AndDependency::create_shared(
+                _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
+        for (auto* channel : channels) {
             if (channel->is_local()) {
                 _local_channels_dependency[dep_id] = 
channel->get_local_channel_dependency();
                 DCHECK(_local_channels_dependency[dep_id] != nullptr);
                 
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
                 _wait_channel_timer[dep_id] = ADD_CHILD_TIMER(
                         _profile, fmt::format("WaitForLocalExchangeBuffer{}", 
dep_id), timer_name);
+                auto local_recvr = channel->local_recvr();
+                
deps_for_channels_mem_limit->add_child(local_recvr->get_mem_limit_dependency());
                 dep_id++;
             }
         }
         _exchange_sink_dependency->add_child(deps_for_channels);
+        _exchange_sink_dependency->add_child(deps_for_channels_mem_limit);
     }
     if (p._part_type == TPartitionType::HASH_PARTITIONED) {
         _partition_count = channels.size();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 5df03ea7773..a34c4f4b435 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -96,9 +96,14 @@ public:
     LocalExchangeChannelDependency(int id, int node_id, QueryContext* 
query_ctx)
             : Dependency(id, node_id, "LocalExchangeChannelDependency", true, 
query_ctx) {}
     ~LocalExchangeChannelDependency() override = default;
-    // TODO(gabriel): blocked by memory
 };
 
+class LocalExchangeMemLimitDependency final : public Dependency {
+    ENABLE_FACTORY_CREATOR(LocalExchangeMemLimitDependency);
+    LocalExchangeMemLimitDependency(int id, int node_id, QueryContext* 
query_ctx)
+            : Dependency(id, node_id, "LocalExchangeMemLimitDependency", true, 
query_ctx) {}
+    ~LocalExchangeMemLimitDependency() override = default;
+};
 class ExchangeSinkLocalState final : public 
PipelineXSinkLocalState<AndDependency> {
     ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
     using Base = PipelineXSinkLocalState<AndDependency>;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 255cb151410..847891104c7 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -74,6 +74,7 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
             state, p.input_row_desc(), state->fragment_instance_id(), 
p.node_id(), p.num_senders(),
             profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
+    stream_recvr->create_mem_limit_dependency(p.operator_id(), p.node_id(), 
state->get_query_ctx());
     auto* source_dependency = _dependency;
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 56f6c51e684..0dc47363a8e 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -354,8 +354,7 @@ VDataStreamRecvr::VDataStreamRecvr(
           _profile(profile),
           _peak_memory_usage_counter(nullptr),
           _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
-          _enable_pipeline(state->enable_pipeline_exec()),
-          _mem_available(std::make_shared<bool>(true)) {
+          _enable_pipeline(state->enable_pipeline_exec()) {
     // DataStreamRecvr may be destructed after the instance execution thread 
ends.
     _mem_tracker =
             std::make_unique<MemTracker>("VDataStreamRecvr:" + 
print_id(_fragment_instance_id));
@@ -506,12 +505,21 @@ void VDataStreamRecvr::update_blocks_memory_usage(int64_t 
size) {
     _blocks_memory_usage->add(size);
     auto val = _blocks_memory_usage_current_value.fetch_add(size);
     if (val + size > config::exchg_node_buffer_size_bytes) {
-        *_mem_available = false;
+        if (_exchange_sink_mem_limit_dependency) {
+            _exchange_sink_mem_limit_dependency->block();
+        }
     } else {
-        *_mem_available = true;
+        if (_exchange_sink_mem_limit_dependency) {
+            _exchange_sink_mem_limit_dependency->set_ready();
+        }
     }
 }
 
+void VDataStreamRecvr::create_mem_limit_dependency(int id, int node_id, 
QueryContext* query_ctx) {
+    _exchange_sink_mem_limit_dependency =
+            pipeline::LocalExchangeMemLimitDependency::create_shared(id, 
node_id, query_ctx);
+}
+
 void VDataStreamRecvr::close() {
     if (_is_closed) {
         return;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index e0b63459ad2..122a9d763e1 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -42,6 +42,7 @@
 #include "common/object_pool.h"
 #include "common/status.h"
 #include "runtime/descriptors.h"
+#include "runtime/query_context.h"
 #include "runtime/query_statistics.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
@@ -61,6 +62,7 @@ class RuntimeState;
 namespace pipeline {
 struct ExchangeDataDependency;
 class LocalExchangeChannelDependency;
+class LocalExchangeMemLimitDependency;
 class ExchangeLocalState;
 } // namespace pipeline
 
@@ -130,6 +132,10 @@ public:
     std::shared_ptr<pipeline::LocalExchangeChannelDependency> 
get_local_channel_dependency(
             int sender_id);
 
+    void create_mem_limit_dependency(int id, int node_id, QueryContext* 
query_ctx);
+
+    auto get_mem_limit_dependency() { return 
_exchange_sink_mem_limit_dependency; }
+
 private:
     void update_blocks_memory_usage(int64_t size);
     class PipSenderQueue;
@@ -189,7 +195,8 @@ private:
     std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
             _sender_to_local_channel_dependency;
 
-    std::shared_ptr<bool> _mem_available;
+    // use to limit sink write
+    std::shared_ptr<pipeline::LocalExchangeMemLimitDependency> 
_exchange_sink_mem_limit_dependency;
 };
 
 class ThreadClosure : public google::protobuf::Closure {
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index f59dad266f8..0e727a41f03 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -321,6 +321,11 @@ public:
 
     void set_receiver_eof(Status st) { _receiver_status = st; }
 
+    auto local_recvr() {
+        DCHECK(is_local());
+        return _local_recvr;
+    }
+
 protected:
     bool _recvr_is_valid() {
         if (_local_recvr && !_local_recvr->is_closed()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to