This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d988193d39e [pipelineX](shuffle) block exchange sink by memory usage
(#26595)
d988193d39e is described below
commit d988193d39e188b1c909ea2449238ad68b71e5ae
Author: Gabriel <[email protected]>
AuthorDate: Thu Nov 9 21:28:22 2023 +0800
[pipelineX](shuffle) block exchange sink by memory usage (#26595)
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 +-
be/src/pipeline/exec/exchange_sink_buffer.h | 2 +-
be/src/pipeline/exec/exchange_sink_operator.cpp | 8 +++--
be/src/pipeline/exec/exchange_sink_operator.h | 18 ++++++++--
be/src/pipeline/exec/exchange_source_operator.cpp | 6 ++--
be/src/vec/runtime/vdata_stream_recvr.cpp | 40 +++++++++++------------
be/src/vec/runtime/vdata_stream_recvr.h | 8 ++---
be/src/vec/sink/vdata_stream_sender.cpp | 7 ++++
8 files changed, 58 insertions(+), 34 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 4f93a39caa6..29933fcdd15 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -62,7 +62,8 @@ namespace pipeline {
template <typename Parent>
ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId
dest_node_id,
int send_id, int be_number,
QueryContext* context)
- : _is_finishing(false),
+ : _queue_capacity(0),
+ _is_finishing(false),
_query_id(query_id),
_dest_node_id(dest_node_id),
_sender_id(send_id),
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 2b30f6fac70..c04de2a51f3 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -199,6 +199,7 @@ private:
phmap::flat_hash_map<InstanceLoId,
std::queue<TransmitInfo<Parent>,
std::list<TransmitInfo<Parent>>>>
_instance_to_package_queue;
+ size_t _queue_capacity;
// store data in broadcast shuffle
phmap::flat_hash_map<InstanceLoId,
std::queue<BroadcastTransmitInfo<Parent>,
std::list<BroadcastTransmitInfo<Parent>>>>
@@ -237,7 +238,6 @@ private:
std::atomic<int> _total_queue_size = 0;
static constexpr int QUEUE_CAPACITY_FACTOR = 64;
- int _queue_capacity = 0;
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr;
std::shared_ptr<FinishDependency> _finish_dependency = nullptr;
QueryStatistics* _statistics = nullptr;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 44d6b6448ed..25418492954 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -200,8 +200,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_local_channels_dependency[dep_id] =
channel->get_local_channel_dependency();
DCHECK(_local_channels_dependency[dep_id] != nullptr);
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
- _wait_channel_timer[dep_id] =
- ADD_CHILD_TIMER(_profile,
"WaitForLocalExchangeBuffer", timer_name);
+ _wait_channel_timer[dep_id] = ADD_CHILD_TIMER(
+ _profile, fmt::format("WaitForLocalExchangeBuffer{}",
dep_id), timer_name);
dep_id++;
}
}
@@ -213,12 +213,16 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
new
vectorized::XXHashPartitioner<vectorized::ShuffleChannelIds>(channels.size()));
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+ _profile->add_info_string("Partitioner",
+ fmt::format("XXHashPartitioner({})",
_partition_count));
} else if (p._part_type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = channel_shared_ptrs.size();
_partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
channel_shared_ptrs.size()));
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+ _profile->add_info_string("Partitioner",
+ fmt::format("Crc32HashPartitioner({})",
_partition_count));
}
return Status::OK();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 2b42d28958a..6b9d3b5e4b1 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -134,11 +134,25 @@ private:
class LocalExchangeChannelDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency);
- LocalExchangeChannelDependency(int id)
- : WriteDependency(id, "LocalExchangeChannelDependency") {}
+ LocalExchangeChannelDependency(int id, std::shared_ptr<bool> mem_available)
+ : WriteDependency(id, "LocalExchangeChannelDependency"),
+ _mem_available(mem_available) {}
~LocalExchangeChannelDependency() override = default;
+ WriteDependency* write_blocked_by() override {
+ if (config::enable_fuzzy_mode && !_is_runnable() &&
+ _should_log(_write_dependency_watcher.elapsed_time())) {
+ LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
+ << id();
+ }
+ return _is_runnable() ? nullptr : this;
+ }
+
void* shared_state() override { return nullptr; }
+
+private:
+ bool _is_runnable() const { return _ready_for_write || *_mem_available; }
+ std::shared_ptr<bool> _mem_available;
};
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index a37272fd741..3213ed55778 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -60,10 +60,10 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
queues[i]->set_dependency(deps[i]);
source_dependency->add_child(deps[i]);
}
+ static const std::string timer_name =
+ "WaitForDependency[" + source_dependency->name() + "]Time";
+ _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
for (size_t i = 0; i < queues.size(); i++) {
- static const std::string timer_name =
- "WaitForDependency[" + source_dependency->name() + "]Time";
- _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
metrics[i] = ADD_CHILD_TIMER(_runtime_profile,
fmt::format("WaitForData{}", i), timer_name);
}
RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone(
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 9384d4abbfc..be291828f0f 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -103,8 +103,8 @@ Status
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
if (!_is_cancelled && _num_remaining_senders > 0) {
_dependency->block_reading();
}
- for (auto& it : _local_channel_dependency) {
- it->set_ready_for_write();
+ if (_local_channel_dependency) {
+ _local_channel_dependency->set_ready_for_write();
}
}
@@ -349,22 +349,23 @@ VDataStreamRecvr::VDataStreamRecvr(
_profile(profile),
_peak_memory_usage_counter(nullptr),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
- _enable_pipeline(state->enable_pipeline_exec()) {
+ _enable_pipeline(state->enable_pipeline_exec()),
+ _mem_available(std::make_shared<bool>(true)) {
// DataStreamRecvr may be destructed after the instance execution thread
ends.
_mem_tracker =
std::make_unique<MemTracker>("VDataStreamRecvr:" +
print_id(_fragment_instance_id));
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ // Create one queue per sender if is_merging is true.
+ int num_queues = is_merging ? num_senders : 1;
if (state->enable_pipeline_x_exec()) {
- _sender_to_local_channel_dependency.resize(num_senders);
- for (size_t i = 0; i < num_senders; i++) {
+ _sender_to_local_channel_dependency.resize(num_queues);
+ for (size_t i = 0; i < num_queues; i++) {
_sender_to_local_channel_dependency[i] =
-
pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id);
+
pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id,
+
_mem_available);
}
}
-
- // Create one queue per sender if is_merging is true.
- int num_queues = is_merging ? num_senders : 1;
_sender_queues.reserve(num_queues);
int num_sender_per_queue = is_merging ? 1 : num_senders;
for (int i = 0; i < num_queues; ++i) {
@@ -372,14 +373,7 @@ VDataStreamRecvr::VDataStreamRecvr(
if (_enable_pipeline) {
queue = _sender_queue_pool.add(new PipSenderQueue(this,
num_sender_per_queue, profile));
if (state->enable_pipeline_x_exec()) {
- auto dependencies =
- is_merging
- ? std::vector<std::shared_ptr<
- pipeline::
-
LocalExchangeChannelDependency>> {_sender_to_local_channel_dependency
-
[i]}
- : _sender_to_local_channel_dependency;
- queue->set_local_channel_dependency(dependencies);
+
queue->set_local_channel_dependency(_sender_to_local_channel_dependency[i]);
}
} else {
queue = _sender_queue_pool.add(new SenderQueue(this,
num_sender_per_queue, profile));
@@ -449,9 +443,8 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
std::shared_ptr<pipeline::LocalExchangeChannelDependency>
VDataStreamRecvr::get_local_channel_dependency(int sender_id) {
- DCHECK_GT(_sender_to_local_channel_dependency.size(), sender_id);
- DCHECK(_sender_to_local_channel_dependency[sender_id] != nullptr);
- return _sender_to_local_channel_dependency[sender_id];
+ DCHECK(_sender_to_local_channel_dependency[_is_merging ? sender_id : 0] !=
nullptr);
+ return _sender_to_local_channel_dependency[_is_merging ? sender_id : 0];
}
bool VDataStreamRecvr::ready_to_read() {
@@ -504,7 +497,12 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) {
void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
_blocks_memory_usage->add(size);
- _blocks_memory_usage_current_value = _blocks_memory_usage->current_value();
+ auto val = _blocks_memory_usage_current_value.fetch_add(size);
+ if (val + size > config::exchg_node_buffer_size_bytes) {
+ *_mem_available = false;
+ } else {
+ *_mem_available = true;
+ }
}
void VDataStreamRecvr::close() {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 2f5c88301e2..e31b433491a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -186,6 +186,8 @@ private:
bool _enable_pipeline;
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
_sender_to_local_channel_dependency;
+
+ std::shared_ptr<bool> _mem_available;
};
class ThreadClosure : public google::protobuf::Closure {
@@ -204,8 +206,7 @@ public:
virtual ~SenderQueue();
void set_local_channel_dependency(
-
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>&
- local_channel_dependency) {
+ std::shared_ptr<pipeline::LocalExchangeChannelDependency>
local_channel_dependency) {
_local_channel_dependency = local_channel_dependency;
}
@@ -255,8 +256,7 @@ protected:
std::unordered_map<std::thread::id, std::unique_ptr<ThreadClosure>>
_local_closure;
std::shared_ptr<pipeline::ExchangeDataDependency> _dependency = nullptr;
- std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
- _local_channel_dependency;
+ std::shared_ptr<pipeline::LocalExchangeChannelDependency>
_local_channel_dependency;
};
class VDataStreamRecvr::PipSenderQueue : public SenderQueue {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 6c64cf8db04..f5397e831d3 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -460,6 +460,13 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
RETURN_IF_ERROR(_partitioner->prepare(state, _row_desc));
+ if (_part_type == TPartitionType::HASH_PARTITIONED) {
+ _profile->add_info_string("Partitioner",
+ fmt::format("XXHashPartitioner({})",
_partition_count));
+ } else if (_part_type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+ _profile->add_info_string("Partitioner",
+ fmt::format("Crc32HashPartitioner({})",
_partition_count));
+ }
}
_bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]