This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e326ebb63e4 [feature](pipelineX) control exchange sink by memory usage
(#28814)
e326ebb63e4 is described below
commit e326ebb63e4e07d8ee6595561ab19dc5d411f592
Author: Mryange <[email protected]>
AuthorDate: Mon Dec 25 10:31:50 2023 +0800
[feature](pipelineX) control exchange sink by memory usage (#28814)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 7 ++++++-
be/src/pipeline/exec/exchange_sink_operator.h | 7 ++++++-
be/src/pipeline/exec/exchange_source_operator.cpp | 1 +
be/src/vec/runtime/vdata_stream_recvr.cpp | 16 ++++++++++++----
be/src/vec/runtime/vdata_stream_recvr.h | 9 ++++++++-
be/src/vec/sink/vdata_stream_sender.h | 5 +++++
6 files changed, 38 insertions(+), 7 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 43bec0bd92d..9f9b36d1cb2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -199,17 +199,22 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_wait_channel_timer.resize(local_size);
auto deps_for_channels = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
- for (auto channel : channels) {
+ auto deps_for_channels_mem_limit = AndDependency::create_shared(
+ _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
+ for (auto* channel : channels) {
if (channel->is_local()) {
_local_channels_dependency[dep_id] =
channel->get_local_channel_dependency();
DCHECK(_local_channels_dependency[dep_id] != nullptr);
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
_wait_channel_timer[dep_id] = ADD_CHILD_TIMER(
_profile, fmt::format("WaitForLocalExchangeBuffer{}",
dep_id), timer_name);
+ auto local_recvr = channel->local_recvr();
+
deps_for_channels_mem_limit->add_child(local_recvr->get_mem_limit_dependency());
dep_id++;
}
}
_exchange_sink_dependency->add_child(deps_for_channels);
+ _exchange_sink_dependency->add_child(deps_for_channels_mem_limit);
}
if (p._part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 5df03ea7773..a34c4f4b435 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -96,9 +96,14 @@ public:
LocalExchangeChannelDependency(int id, int node_id, QueryContext*
query_ctx)
: Dependency(id, node_id, "LocalExchangeChannelDependency", true,
query_ctx) {}
~LocalExchangeChannelDependency() override = default;
- // TODO(gabriel): blocked by memory
};
+class LocalExchangeMemLimitDependency final : public Dependency {
+ ENABLE_FACTORY_CREATOR(LocalExchangeMemLimitDependency);
+ LocalExchangeMemLimitDependency(int id, int node_id, QueryContext*
query_ctx)
+ : Dependency(id, node_id, "LocalExchangeMemLimitDependency", true,
query_ctx) {}
+ ~LocalExchangeMemLimitDependency() override = default;
+};
class ExchangeSinkLocalState final : public
PipelineXSinkLocalState<AndDependency> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<AndDependency>;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 255cb151410..847891104c7 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -74,6 +74,7 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(),
p.node_id(), p.num_senders(),
profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
+ stream_recvr->create_mem_limit_dependency(p.operator_id(), p.node_id(),
state->get_query_ctx());
auto* source_dependency = _dependency;
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 56f6c51e684..0dc47363a8e 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -354,8 +354,7 @@ VDataStreamRecvr::VDataStreamRecvr(
_profile(profile),
_peak_memory_usage_counter(nullptr),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
- _enable_pipeline(state->enable_pipeline_exec()),
- _mem_available(std::make_shared<bool>(true)) {
+ _enable_pipeline(state->enable_pipeline_exec()) {
// DataStreamRecvr may be destructed after the instance execution thread
ends.
_mem_tracker =
std::make_unique<MemTracker>("VDataStreamRecvr:" +
print_id(_fragment_instance_id));
@@ -506,12 +505,21 @@ void VDataStreamRecvr::update_blocks_memory_usage(int64_t
size) {
_blocks_memory_usage->add(size);
auto val = _blocks_memory_usage_current_value.fetch_add(size);
if (val + size > config::exchg_node_buffer_size_bytes) {
- *_mem_available = false;
+ if (_exchange_sink_mem_limit_dependency) {
+ _exchange_sink_mem_limit_dependency->block();
+ }
} else {
- *_mem_available = true;
+ if (_exchange_sink_mem_limit_dependency) {
+ _exchange_sink_mem_limit_dependency->set_ready();
+ }
}
}
+void VDataStreamRecvr::create_mem_limit_dependency(int id, int node_id,
QueryContext* query_ctx) {
+ _exchange_sink_mem_limit_dependency =
+ pipeline::LocalExchangeMemLimitDependency::create_shared(id,
node_id, query_ctx);
+}
+
void VDataStreamRecvr::close() {
if (_is_closed) {
return;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index e0b63459ad2..122a9d763e1 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -42,6 +42,7 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
+#include "runtime/query_context.h"
#include "runtime/query_statistics.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
@@ -61,6 +62,7 @@ class RuntimeState;
namespace pipeline {
struct ExchangeDataDependency;
class LocalExchangeChannelDependency;
+class LocalExchangeMemLimitDependency;
class ExchangeLocalState;
} // namespace pipeline
@@ -130,6 +132,10 @@ public:
std::shared_ptr<pipeline::LocalExchangeChannelDependency>
get_local_channel_dependency(
int sender_id);
+ void create_mem_limit_dependency(int id, int node_id, QueryContext*
query_ctx);
+
+ auto get_mem_limit_dependency() { return
_exchange_sink_mem_limit_dependency; }
+
private:
void update_blocks_memory_usage(int64_t size);
class PipSenderQueue;
@@ -189,7 +195,8 @@ private:
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
_sender_to_local_channel_dependency;
- std::shared_ptr<bool> _mem_available;
+ // use to limit sink write
+ std::shared_ptr<pipeline::LocalExchangeMemLimitDependency>
_exchange_sink_mem_limit_dependency;
};
class ThreadClosure : public google::protobuf::Closure {
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index f59dad266f8..0e727a41f03 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -321,6 +321,11 @@ public:
void set_receiver_eof(Status st) { _receiver_status = st; }
+ auto local_recvr() {
+ DCHECK(is_local());
+ return _local_recvr;
+ }
+
protected:
bool _recvr_is_valid() {
if (_local_recvr && !_local_recvr->is_closed()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]