This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 41bb04b9e9e refactor logic of revoking and low memory mode in local
exchange oper… (#41264)
41bb04b9e9e is described below
commit 41bb04b9e9e237fd5680674d924228be7aa3b0e6
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Sep 26 10:12:36 2024 +0800
refactor logic of revoking and low memory mode in local exchange oper…
(#41264)
…ator
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/pipeline/dependency.h | 21 ++++++-
be/src/pipeline/exec/operator.h | 37 ++++++++++++-
.../exec/partitioned_aggregation_sink_operator.cpp | 29 ++++++++--
.../exec/partitioned_aggregation_sink_operator.h | 8 ++-
.../exec/partitioned_hash_join_probe_operator.cpp | 6 +-
.../exec/partitioned_hash_join_probe_operator.h | 3 +-
.../exec/partitioned_hash_join_sink_operator.cpp | 59 ++++++++++++++------
.../exec/partitioned_hash_join_sink_operator.h | 11 ++--
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 19 ++++---
be/src/pipeline/exec/spill_sort_sink_operator.h | 7 ++-
be/src/pipeline/exec/spill_utils.h | 32 +++++++++++
.../local_exchange_sink_operator.cpp | 8 ++-
be/src/pipeline/local_exchange/local_exchanger.cpp | 49 +++++++++++++----
be/src/pipeline/pipeline_task.cpp | 9 ++-
be/src/pipeline/pipeline_task.h | 2 +-
be/src/runtime/query_context.cpp | 64 ++++++++++++++++++++++
be/src/runtime/query_context.h | 7 ++-
.../workload_group/workload_group_manager.cpp | 38 ++++++-------
18 files changed, 325 insertions(+), 84 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 8cb479ccbb0..5f030dda5d2 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -26,6 +26,7 @@
#include <thread>
#include <utility>
+#include "common/config.h"
#include "common/logging.h"
#include "concurrentqueue.h"
#include "gutil/integral_types.h"
@@ -832,6 +833,7 @@ public:
std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
std::atomic<int64_t> mem_usage = 0;
+ size_t _buffer_mem_limit = config::local_exchange_buffer_mem_limit;
// We need to make sure to add mem_usage first and then enqueue, otherwise
sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
virtual void create_dependencies(int local_exchange_id) {
@@ -875,7 +877,7 @@ public:
void sub_mem_usage(int channel_id, size_t delta) {
mem_trackers[channel_id]->release(delta); }
virtual void add_total_mem_usage(size_t delta, int channel_id) {
- if (mem_usage.fetch_add(delta) + delta >
config::local_exchange_buffer_mem_limit) {
+ if (mem_usage.fetch_add(delta) + delta > _buffer_mem_limit) {
sink_deps.front()->block();
}
}
@@ -884,10 +886,15 @@ public:
auto prev_usage = mem_usage.fetch_sub(delta);
DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << "
delta: " << delta
<< " channel_id: " << channel_id;
- if (prev_usage - delta <= config::local_exchange_buffer_mem_limit) {
+ if (prev_usage - delta <= _buffer_mem_limit) {
sink_deps.front()->set_ready();
}
}
+
+ virtual void set_low_memory_mode() {
+ _buffer_mem_limit =
+ std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10
* 1024 * 1024);
+ }
};
struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
@@ -933,6 +940,14 @@ struct LocalMergeExchangeSharedState : public
LocalExchangeSharedState {
source_deps[channel_id]->set_ready();
}
+ void set_low_memory_mode() override {
+ _buffer_mem_limit =
+ std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10
* 1024 * 1024);
+ DCHECK(!_queues_mem_usage.empty());
+ _each_queue_limit =
+ std::max<int64_t>(64 * 1024, _buffer_mem_limit /
_queues_mem_usage.size());
+ }
+
Dependency* get_sink_dep_by_channel_id(int channel_id) override {
return sink_deps[channel_id].get();
}
@@ -943,7 +958,7 @@ struct LocalMergeExchangeSharedState : public
LocalExchangeSharedState {
private:
std::vector<std::atomic_int64_t> _queues_mem_usage;
- const int64_t _each_queue_limit;
+ int64_t _each_queue_limit;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 3a644eb4f02..1d2dbcc3592 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -32,6 +32,7 @@
#include "common/status.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/operator.h"
+#include "pipeline/exec/spill_utils.h"
#include "pipeline/local_exchange/local_exchanger.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/query_context.h"
@@ -113,7 +114,11 @@ public:
return state->minimum_operator_memory_required_bytes();
}
- virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
+ virtual Status revoke_memory(RuntimeState* state,
+ const std::shared_ptr<SpillContext>&
spill_context) {
+ return Status::OK();
+ }
+
[[nodiscard]] virtual bool require_data_distribution() const { return
false; }
OperatorPtr child() { return _child; }
[[nodiscard]] bool followed_by_shuffled_join() const { return
_followed_by_shuffled_join; }
@@ -603,6 +608,10 @@ public:
_spill_write_wait_io_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime",
1);
_spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadWaitIOTime", 1);
+ _spill_max_rows_of_partition =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillMaxRowsOfPartition", TUnit::UNIT, 1);
+ _spill_min_rows_of_partition =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillMinRowsOfPartition", TUnit::UNIT, 1);
return Status::OK();
}
@@ -611,6 +620,25 @@ public:
return dependencies;
}
+ void update_max_min_rows_counter() {
+ int64_t max_rows = 0;
+ int64_t min_rows = std::numeric_limits<int64_t>::max();
+
+ for (auto rows : _rows_in_partitions) {
+ if (rows > max_rows) {
+ max_rows = rows;
+ }
+ if (rows < min_rows) {
+ min_rows = rows;
+ }
+ }
+
+ COUNTER_SET(_spill_max_rows_of_partition, max_rows);
+ COUNTER_SET(_spill_min_rows_of_partition, min_rows);
+ }
+
+ std::vector<int64_t> _rows_in_partitions;
+
RuntimeProfile::Counter* _spill_timer = nullptr;
RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
@@ -619,6 +647,8 @@ public:
RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
+ RuntimeProfile::Counter* _spill_max_rows_of_partition = nullptr;
+ RuntimeProfile::Counter* _spill_min_rows_of_partition = nullptr;
};
class OperatorXBase : public OperatorBase {
@@ -718,9 +748,10 @@ public:
return (_child and !is_source()) ? _child->revocable_mem_size(state) :
0;
}
- Status revoke_memory(RuntimeState* state) override {
+ Status revoke_memory(RuntimeState* state,
+ const std::shared_ptr<SpillContext>& spill_context)
override {
if (_child and !is_source()) {
- return _child->revoke_memory(state);
+ return _child->revoke_memory(state, spill_context);
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 39311a62dcc..6d84b8e8bb5 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -18,6 +18,7 @@
#include "partitioned_aggregation_sink_operator.h"
#include <cstdint>
+#include <limits>
#include <memory>
#include "aggregation_sink_operator.h"
@@ -26,6 +27,8 @@
#include "pipeline/exec/spill_utils.h"
#include "pipeline/pipeline_task.h"
#include "runtime/fragment_mgr.h"
+#include "util/runtime_profile.h"
+#include "vec/spill/spill_stream.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
@@ -60,6 +63,8 @@ Status
PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
value_columns_.emplace_back(aggregate_evaluator->function()->create_serialize_column());
}
+ _rows_in_partitions.assign(Base::_shared_state->partition_count, 0);
+
_spill_dependency = Dependency::create_shared(parent.operator_id(),
parent.node_id(),
"AggSinkSpillDependency",
true);
state->get_task()->add_spill_dependency(_spill_dependency.get());
@@ -128,6 +133,8 @@ void
PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
UPDATE_PROFILE(_hash_table_emplace_timer, "HashTableEmplaceTime");
UPDATE_PROFILE(_hash_table_input_counter, "HashTableInputCount");
UPDATE_PROFILE(_max_row_size_counter, "MaxRowSizeInBytes");
+
+ update_max_min_rows_counter();
}
PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int
operator_id,
@@ -172,7 +179,7 @@ Status
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
if (eos) {
if (local_state._shared_state->is_spilled) {
if (revocable_mem_size(state) > 0) {
- RETURN_IF_ERROR(revoke_memory(state));
+ RETURN_IF_ERROR(revoke_memory(state, nullptr));
} else {
for (auto& partition :
local_state._shared_state->spill_partitions) {
RETURN_IF_ERROR(partition->finish_current_spilling(eos));
@@ -184,6 +191,10 @@ Status
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
+ } else if (local_state._shared_state->is_spilled) {
+ if (revocable_mem_size(state) >=
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+ return revoke_memory(state, nullptr);
+ }
}
if (local_state._runtime_state) {
auto* sink_local_state =
local_state._runtime_state->get_sink_local_state();
@@ -191,9 +202,10 @@ Status
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
}
return Status::OK();
}
-Status PartitionedAggSinkOperatorX::revoke_memory(RuntimeState* state) {
+Status PartitionedAggSinkOperatorX::revoke_memory(
+ RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
auto& local_state = get_local_state(state);
- return local_state.revoke_memory(state);
+ return local_state.revoke_memory(state, spill_context);
}
size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state)
const {
@@ -240,7 +252,8 @@ size_t
PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
return _agg_sink_operator->get_reserve_mem_size(runtime_state);
}
-Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
+Status PartitionedAggSinkLocalState::revoke_memory(
+ RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
const auto size_to_revoke = _parent->revocable_mem_size(state);
VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node "
<< Base::_parent->node_id()
@@ -279,7 +292,7 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
state->get_query_ctx()->increase_revoking_tasks_count();
auto spill_runnable = std::make_shared<SpillRunnable>(
state, _shared_state->shared_from_this(),
- [this, &parent, state, query_id, size_to_revoke, submit_timer] {
+ [this, &parent, state, query_id, size_to_revoke, spill_context,
submit_timer] {
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
auto st = Status::InternalError(
"fault_inject partitioned_agg_sink "
@@ -308,8 +321,12 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
Base::_dependency->set_ready_to_read();
_finish_dependency->set_ready();
}
- Base::_spill_dependency->Dependency::set_ready();
state->get_query_ctx()->decrease_revoking_tasks_count();
+ Base::_spill_dependency->Dependency::set_ready();
+
+ if (spill_context) {
+ spill_context->on_task_finished();
+ }
}};
auto* runtime_state = _runtime_state.get();
auto* agg_data =
parent._agg_sink_operator->get_agg_data(runtime_state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 756b686a5b3..0027754cde0 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -16,6 +16,7 @@
// under the License.
#pragma once
+#include <limits>
#include <memory>
#include "aggregation_sink_operator.h"
@@ -45,7 +46,7 @@ public:
Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get();
}
- Status revoke_memory(RuntimeState* state);
+ Status revoke_memory(RuntimeState* state, const
std::shared_ptr<SpillContext>& spill_context);
Status setup_in_memory_agg_op(RuntimeState* state);
@@ -102,6 +103,7 @@ public:
for (int i = 0; i < Base::_shared_state->partition_count &&
!state->is_cancelled();
++i) {
if (spill_infos[i].keys_.size() >= spill_batch_rows) {
+ _rows_in_partitions[i] += spill_infos[i].keys_.size();
status = _spill_partition(
state, context,
Base::_shared_state->spill_partitions[i],
spill_infos[i].keys_, spill_infos[i].values_,
nullptr, false);
@@ -117,6 +119,7 @@ public:
auto spill_null_key_data =
(hash_null_key_data && i ==
Base::_shared_state->partition_count - 1);
if (spill_infos[i].keys_.size() > 0 || spill_null_key_data) {
+ _rows_in_partitions[i] += spill_infos[i].keys_.size();
status = _spill_partition(state, context,
Base::_shared_state->spill_partitions[i],
spill_infos[i].keys_,
spill_infos[i].values_,
spill_null_key_data
@@ -338,7 +341,8 @@ public:
}
size_t revocable_mem_size(RuntimeState* state) const override;
- Status revoke_memory(RuntimeState* state) override;
+ Status revoke_memory(RuntimeState* state,
+ const std::shared_ptr<SpillContext>& spill_context)
override;
size_t get_reserve_mem_size(RuntimeState* state) override;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f95cca1f6af..fa9e3ff23b7 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -167,6 +167,7 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
MonotonicStopWatch submit_timer;
submit_timer.start();
+
auto spill_func = [query_id, state, submit_timer, spill_size_threshold,
this] {
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_spill_probe_timer);
@@ -791,7 +792,8 @@ size_t
PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* stat
return mem_size;
}
-Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
+Status PartitionedHashJoinProbeOperatorX::revoke_memory(
+ RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
auto& local_state = get_local_state(state);
VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe
node: " << node_id()
<< ", task: " << state->task_id() << ", child eos: " <<
local_state._child_eos;
@@ -806,7 +808,7 @@ Status
PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true));
if (_child) {
- return _child->revoke_memory(state);
+ return _child->revoke_memory(state, nullptr);
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index b611fb661af..621681ca4cf 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -186,7 +186,8 @@ public:
return _inner_probe_operator->require_data_distribution();
}
- Status revoke_memory(RuntimeState* state) override;
+ Status revoke_memory(RuntimeState* state,
+ const std::shared_ptr<SpillContext>& spill_context)
override;
private:
Status _revoke_memory(RuntimeState* state);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index f2375632d2a..5323d74341a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -20,9 +20,11 @@
#include <glog/logging.h>
#include <algorithm>
+#include <memory>
#include "common/logging.h"
#include "pipeline/exec/operator.h"
+#include "pipeline/exec/spill_utils.h"
#include "pipeline/pipeline_task.h"
#include "runtime/fragment_mgr.h"
#include "util/mem_info.h"
@@ -41,6 +43,8 @@ Status
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
_shared_state->partitioned_build_blocks.resize(p._partition_count);
_shared_state->spilled_streams.resize(p._partition_count);
+ _rows_in_partitions.assign(p._partition_count, 0);
+
_spill_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
"HashJoinBuildSpillDependency", true);
state->get_task()->add_spill_dependency(_spill_dependency.get());
@@ -125,16 +129,17 @@ size_t
PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta
return size_to_reserve;
}
-Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState*
state) {
+Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
+ RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child->row_desc();
const auto num_slots = row_desc.num_slots();
vectorized::Block build_block;
size_t block_old_mem = 0;
- auto inner_sink_state_ =
_shared_state->inner_runtime_state->get_sink_local_state();
+ auto* inner_sink_state_ =
_shared_state->inner_runtime_state->get_sink_local_state();
if (inner_sink_state_) {
- auto inner_sink_state =
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+ auto* inner_sink_state =
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
build_block = inner_sink_state->_build_side_mutable_block.to_block();
block_old_mem = build_block.allocated_bytes();
}
@@ -142,6 +147,9 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
if (build_block.rows() <= 1) {
LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id()
<< ", task: " << state->task_id();
+ if (spill_context) {
+ spill_context->on_task_finished();
+ }
return Status::OK();
}
@@ -243,7 +251,7 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
_spill_dependency->set_ready();
};
- auto exception_catch_func = [spill_func, this]() mutable {
+ auto exception_catch_func = [spill_func, spill_context, this]() mutable {
SCOPED_TIMER(_spill_timer);
auto status = [&]() {
RETURN_IF_CATCH_EXCEPTION(spill_func());
@@ -256,6 +264,10 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
_spill_status_ok = false;
_spill_dependency->set_ready();
}
+
+ if (spill_context) {
+ spill_context->on_task_finished();
+ }
};
auto spill_runnable = std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
@@ -273,7 +285,8 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
return thread_pool->submit(std::move(spill_runnable));
}
-Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
+Status PartitionedHashJoinSinkLocalState::revoke_memory(
+ RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
LOG(INFO) << "hash join sink " << _parent->node_id() << " revoke_memory"
<< ", eos: " << _child_eos;
DCHECK_EQ(_spilling_streams_count, 0);
@@ -282,7 +295,7 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
if (!_shared_state->need_to_spill) {
profile()->add_info_string("Spilled", "true");
_shared_state->need_to_spill = true;
- return _revoke_unpartitioned_block(state);
+ return _revoke_unpartitioned_block(state, spill_context);
}
_spilling_streams_count = _shared_state->partitioned_build_blocks.size();
@@ -317,8 +330,7 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
state->get_query_ctx()->increase_revoking_tasks_count();
auto spill_runnable = std::make_shared<SpillRunnable>(
state, _shared_state->shared_from_this(),
- [this, state, query_id, spilling_stream, i, submit_timer] {
- SCOPED_TIMER(_spill_timer);
+ [this, query_id, spilling_stream, i, submit_timer,
spill_context] {
DBUG_EXECUTE_IF(
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
@@ -331,7 +343,8 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
SCOPED_TIMER(_spill_build_timer);
auto status = [&]() {
- RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i,
spilling_stream));
+ RETURN_IF_CATCH_EXCEPTION(
+ _spill_to_disk(i, spilling_stream,
spill_context));
return Status::OK();
}();
@@ -341,8 +354,6 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
_spill_status_ok = false;
_spill_status = std::move(status);
}
-
- state->get_query_ctx()->decrease_revoking_tasks_count();
});
if (st.ok()) {
st = spill_io_pool->submit(std::move(spill_runnable));
@@ -368,6 +379,10 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
}
});
_dependency->set_ready_to_read();
+
+ if (spill_context) {
+ spill_context->on_task_finished();
+ }
}
return Status::OK();
}
@@ -408,13 +423,17 @@ Status
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
}
RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(in_block,
partition_indexes[i].data(),
partition_indexes[i].data() + count));
+ _rows_in_partitions[i] += count;
}
+ update_max_min_rows_counter();
+
return Status::OK();
}
void PartitionedHashJoinSinkLocalState::_spill_to_disk(
- uint32_t partition_index, const vectorized::SpillStreamSPtr&
spilling_stream) {
+ uint32_t partition_index, const vectorized::SpillStreamSPtr&
spilling_stream,
+ const std::shared_ptr<SpillContext>& spill_context) {
auto& partitioned_block =
_shared_state->partitioned_build_blocks[partition_index];
if (_spill_status_ok) {
@@ -436,6 +455,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
if (num == 1) {
std::unique_lock<std::mutex> lock(_spill_lock);
+ _state->get_query_ctx()->decrease_revoking_tasks_count();
_spill_dependency->set_ready();
if (_child_eos) {
VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) <<
", hash join sink "
@@ -449,6 +469,10 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
});
_dependency->set_ready_to_read();
}
+
+ if (spill_context) {
+ spill_context->on_task_finished();
+ }
}
}
@@ -575,7 +599,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
<< _inner_sink_operator->get_memory_usage(
local_state._shared_state->inner_runtime_state.get());
} else {
- return revoke_memory(state);
+ return revoke_memory(state, nullptr);
}
std::for_each(local_state._shared_state->partitioned_build_blocks.begin(),
@@ -594,9 +618,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
if (need_to_spill) {
RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0,
rows));
if (eos) {
- return revoke_memory(state);
+ return revoke_memory(state, nullptr);
} else if (revocable_mem_size(state) >
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
- return revoke_memory(state);
+ return revoke_memory(state, nullptr);
}
} else {
if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
@@ -630,10 +654,11 @@ size_t
PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state)
return local_state.revocable_mem_size(state);
}
-Status PartitionedHashJoinSinkOperatorX::revoke_memory(RuntimeState* state) {
+Status PartitionedHashJoinSinkOperatorX::revoke_memory(
+ RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- return local_state.revoke_memory(state);
+ return local_state.revoke_memory(state, spill_context);
}
size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState*
state) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index b2c79967b97..a2d75cf2f9b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -43,7 +43,7 @@ public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
- Status revoke_memory(RuntimeState* state);
+ Status revoke_memory(RuntimeState* state, const
std::shared_ptr<SpillContext>& spill_context);
size_t revocable_mem_size(RuntimeState* state) const;
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state);
@@ -52,12 +52,14 @@ protected:
:
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}
void _spill_to_disk(uint32_t partition_index,
- const vectorized::SpillStreamSPtr& spilling_stream);
+ const vectorized::SpillStreamSPtr& spilling_stream,
+ const std::shared_ptr<SpillContext>& spill_context);
Status _partition_block(RuntimeState* state, vectorized::Block* in_block,
size_t begin,
size_t end);
- Status _revoke_unpartitioned_block(RuntimeState* state);
+ Status _revoke_unpartitioned_block(RuntimeState* state,
+ const std::shared_ptr<SpillContext>&
spill_context);
friend class PartitionedHashJoinSinkOperatorX;
@@ -102,7 +104,8 @@ public:
size_t revocable_mem_size(RuntimeState* state) const override;
- Status revoke_memory(RuntimeState* state) override;
+ Status revoke_memory(RuntimeState* state,
+ const std::shared_ptr<SpillContext>& spill_context)
override;
size_t get_reserve_mem_size(RuntimeState* state) override;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index e55982bcb3b..e83ad897257 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -131,9 +131,10 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) {
return _sort_sink_operator->open(state);
}
-Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state) {
+Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state,
+ const
std::shared_ptr<SpillContext>& spill_context) {
auto& local_state = get_local_state(state);
- return local_state.revoke_memory(state);
+ return local_state.revoke_memory(state, spill_context);
}
size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
@@ -163,7 +164,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
if (eos) {
if (local_state._shared_state->is_spilled) {
if (revocable_mem_size(state) > 0) {
- RETURN_IF_ERROR(revoke_memory(state));
+ RETURN_IF_ERROR(revoke_memory(state, nullptr));
} else {
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
@@ -178,7 +179,8 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
return Status::OK();
}
-Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
+Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
+ const
std::shared_ptr<SpillContext>& spill_context) {
if (!_shared_state->is_spilled) {
_shared_state->is_spilled = true;
profile()->add_info_string("Spilled", "true");
@@ -234,14 +236,13 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
}
_spilling_stream.reset();
+ state->get_query_ctx()->decrease_revoking_tasks_count();
if (_eos) {
_dependency->set_ready_to_read();
_finish_dependency->set_ready();
} else {
_spill_dependency->Dependency::set_ready();
}
-
- state->get_query_ctx()->decrease_revoking_tasks_count();
}};
_shared_state->sink_status =
@@ -273,7 +274,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state) {
return Status::OK();
};
- auto exception_catch_func = [this, query_id, spill_func]() {
+ auto exception_catch_func = [this, query_id, spill_context, spill_func]() {
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel",
{
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
query_id, Status::InternalError("fault_inject
spill_sort_sink "
@@ -284,6 +285,10 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
_shared_state->sink_status = [&]() {
RETURN_IF_CATCH_EXCEPTION({ return spill_func(); });
}();
+
+ if (spill_context) {
+ spill_context->on_task_finished();
+ }
};
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index e74b5d2a414..173e3c7847c 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -17,6 +17,8 @@
#pragma once
+#include <memory>
+
#include "operator.h"
#include "sort_sink_operator.h"
@@ -38,7 +40,7 @@ public:
Dependency* finishdependency() override { return _finish_dependency.get();
}
Status setup_in_memory_sort_op(RuntimeState* state);
- Status revoke_memory(RuntimeState* state);
+ Status revoke_memory(RuntimeState* state, const
std::shared_ptr<SpillContext>& spill_context);
private:
void _init_counters();
@@ -86,7 +88,8 @@ public:
size_t revocable_mem_size(RuntimeState* state) const override;
- Status revoke_memory(RuntimeState* state) override;
+ Status revoke_memory(RuntimeState* state,
+ const std::shared_ptr<SpillContext>& spill_context)
override;
using DataSinkOperatorX<LocalStateType>::node_id;
using DataSinkOperatorX<LocalStateType>::operator_id;
diff --git a/be/src/pipeline/exec/spill_utils.h
b/be/src/pipeline/exec/spill_utils.h
index 925e7df44e6..086a6881fcd 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -17,6 +17,13 @@
#pragma once
+#include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
+
+#include <atomic>
+#include <functional>
+#include <utility>
+
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
@@ -28,6 +35,31 @@
namespace doris::pipeline {
using SpillPartitionerType =
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
+struct SpillContext {
+ std::atomic_int running_tasks_count;
+ TUniqueId query_id;
+ std::function<void()> all_tasks_finished_callback;
+
+ SpillContext(int running_tasks_count_, TUniqueId query_id_,
+ std::function<void()> all_tasks_finished_callback_)
+ : running_tasks_count(running_tasks_count_),
+ query_id(std::move(query_id_)),
+
all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {}
+
+ ~SpillContext() {
+ LOG_IF(WARNING, running_tasks_count.load() != 0)
+ << "query: " << print_id(query_id)
+ << " not all spill tasks finished, remaining tasks: " <<
running_tasks_count.load();
+ }
+
+ void on_task_finished() {
+ auto count = running_tasks_count.fetch_sub(1);
+ if (count == 1) {
+ all_tasks_finished_callback();
+ }
+ }
+};
+
class SpillRunnable : public Runnable {
public:
SpillRunnable(RuntimeState* state, const
std::shared_ptr<BasicSharedState>& shared_state,
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 19c37f3649b..ec0d1a40f5e 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -28,7 +28,7 @@ LocalExchangeSinkLocalState::~LocalExchangeSinkLocalState() =
default;
std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const {
auto deps = Base::dependencies();
- auto dep = _shared_state->get_sink_dep_by_channel_id(_channel_id);
+ auto* dep = _shared_state->get_sink_dep_by_channel_id(_channel_id);
if (dep != nullptr) {
deps.push_back(dep);
}
@@ -136,7 +136,13 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
+
+ if (state->get_query_ctx()->low_memory_mode()) {
+ local_state._shared_state->set_low_memory_mode();
+ }
+
RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos,
local_state));
+
local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption());
// If all exchange sources ended due to limit reached, current task should
also finish
if (local_state._exchanger->_running_source_operators == 0) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index f4630f328bb..50f359922a1 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -43,13 +43,17 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int
channel_id,
allocated_bytes = block->data_block.allocated_bytes();
}
std::unique_lock l(_m);
- local_state._shared_state->add_mem_usage(channel_id, allocated_bytes,
- !std::is_same_v<PartitionedBlock,
BlockType> &&
-
!std::is_same_v<BroadcastBlock, BlockType>);
+ constexpr bool update_mem_usage = !std::is_same_v<PartitionedBlock,
BlockType> &&
+ !std::is_same_v<BroadcastBlock,
BlockType>;
+ local_state._shared_state->add_mem_usage(channel_id, allocated_bytes,
update_mem_usage);
+ if constexpr (update_mem_usage) {
+ local_state._mem_tracker->consume(allocated_bytes);
+ }
if (_data_queue[channel_id].enqueue(std::move(block))) {
local_state._shared_state->set_ready_to_read(channel_id);
} else {
local_state._shared_state->sub_mem_usage(channel_id, allocated_bytes);
+ local_state._mem_tracker->release(allocated_bytes);
// `enqueue(block)` return false iff this queue's source operator is
already closed so we
// just unref the block.
if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
@@ -77,11 +81,13 @@ bool
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
if (_data_queue[channel_id].try_dequeue(block)) {
if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
std::is_same_v<BroadcastBlock, BlockType>) {
- local_state._shared_state->sub_mem_usage(channel_id,
-
block.first->data_block.allocated_bytes());
+ const auto bytes = block.first->data_block.allocated_bytes();
+ local_state._shared_state->sub_mem_usage(channel_id, bytes);
+ local_state._mem_tracker->release(bytes);
} else {
- local_state._shared_state->sub_mem_usage(channel_id,
-
block->data_block.allocated_bytes());
+ const auto bytes = block->data_block.allocated_bytes();
+ local_state._shared_state->sub_mem_usage(channel_id, bytes);
+ local_state._mem_tracker->release(bytes);
data_block->swap(block->data_block);
block->unref(local_state._shared_state,
data_block->allocated_bytes(), channel_id);
DCHECK_EQ(block->ref_value(), 0);
@@ -94,11 +100,15 @@ bool
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
if (_data_queue[channel_id].try_dequeue(block)) {
if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
std::is_same_v<BroadcastBlock, BlockType>) {
+ const auto bytes = block.first->data_block.allocated_bytes();
local_state._shared_state->sub_mem_usage(channel_id,
block.first->data_block.allocated_bytes());
+ local_state._mem_tracker->release(bytes);
} else {
+ const auto bytes = block->data_block.allocated_bytes();
local_state._shared_state->sub_mem_usage(channel_id,
block->data_block.allocated_bytes());
+ local_state._mem_tracker->release(bytes);
data_block->swap(block->data_block);
block->unref(local_state._shared_state,
data_block->allocated_bytes(), channel_id);
DCHECK_EQ(block->ref_value(), 0);
@@ -128,6 +138,8 @@ Status ShuffleExchanger::sink(RuntimeState* state,
vectorized::Block* in_block,
in_block, local_state));
}
+
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
+
return Status::OK();
}
@@ -200,8 +212,10 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
if (new_block_wrapper->data_block.empty()) {
return Status::OK();
}
-
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(),
- local_state._channel_id);
+
+ const auto block_bytes = new_block_wrapper->data_block.allocated_bytes();
+ local_state._shared_state->add_total_mem_usage(block_bytes,
local_state._channel_id);
+ local_state._mem_tracker->consume(block_bytes);
auto bucket_seq_to_instance_idx =
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
if (get_type() == ExchangeType::HASH_SHUFFLE) {
@@ -272,6 +286,8 @@ Status PassthroughExchanger::sink(RuntimeState* state,
vectorized::Block* in_blo
auto channel_id = (local_state._channel_id++) % _num_partitions;
_enqueue_data_and_set_ready(channel_id, local_state, std::move(wrapper));
+
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
+
return Status::OK();
}
@@ -316,6 +332,8 @@ Status PassToOneExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
BlockWrapperSPtr wrapper =
BlockWrapper::create_shared(std::move(new_block));
_enqueue_data_and_set_ready(0, local_state, std::move(wrapper));
+
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
+
return Status::OK();
}
@@ -346,6 +364,8 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state,
vectorized::Block* in_
if (eos) {
local_state._shared_state->source_deps[local_state._channel_id]->set_always_ready();
}
+
+
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
return Status::OK();
}
@@ -424,8 +444,9 @@ Status BroadcastExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
}
new_block.swap(*in_block);
auto wrapper = BlockWrapper::create_shared(std::move(new_block));
-
local_state._shared_state->add_total_mem_usage(wrapper->data_block.allocated_bytes(),
- local_state._channel_id);
+ const auto block_bytes = wrapper->data_block.allocated_bytes();
+ local_state._shared_state->add_total_mem_usage(block_bytes,
local_state._channel_id);
+ local_state._mem_tracker->consume(block_bytes);
wrapper->ref(_num_partitions);
for (size_t i = 0; i < _num_partitions; i++) {
_enqueue_data_and_set_ready(i, local_state, {wrapper, {0,
wrapper->data_block.rows()}});
@@ -475,6 +496,7 @@ Status
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
_enqueue_data_and_set_ready(channel_id, local_state,
BlockWrapper::create_shared(std::move(new_block)));
+
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
return Status::OK();
}
@@ -494,7 +516,10 @@ Status
AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectoriz
std::iota(channel_ids.begin() + i, channel_ids.end(), 0);
}
}
- return _split_rows(state, channel_ids.data(), block, local_state);
+
+ RETURN_IF_ERROR(_split_rows(state, channel_ids.data(), block,
local_state));
+
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
+ return Status::OK();
}
Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 356feea9c36..4a1e4536373 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -553,11 +553,14 @@ size_t PipelineTask::get_revocable_size() const {
return revocable_size;
}
-Status PipelineTask::revoke_memory() {
+Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>&
spill_context) {
if (_sink->revocable_mem_size(_state) >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- RETURN_IF_ERROR(_sink->revoke_memory(_state));
+ RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context));
+ } else if (spill_context) {
+ spill_context->on_task_finished();
}
- return _root->revoke_memory(_state);
+
+ return Status::OK();
}
void PipelineTask::wake_up() {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 943366b4b70..633cca93f46 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -248,7 +248,7 @@ public:
}
[[nodiscard]] size_t get_revocable_size() const;
- [[nodiscard]] Status revoke_memory();
+ [[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>&
spill_context);
void add_spill_dependency(Dependency* dependency) {
_spill_dependencies.emplace_back(dependency);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 6a0cf20a4dd..45fd5562a93 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -23,13 +23,17 @@
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
+#include <algorithm>
#include <exception>
#include <memory>
#include <mutex>
+#include <shared_mutex>
#include <sstream>
#include <utility>
+#include <vector>
#include "common/logging.h"
+#include "common/status.h"
#include "olap/olap_common.h"
#include "pipeline/dependency.h"
#include "pipeline/pipeline_fragment_context.h"
@@ -442,6 +446,66 @@ size_t QueryContext::get_revocable_size() const {
return revocable_size;
}
+Status QueryContext::revoke_memory() {
+ std::vector<std::pair<size_t, pipeline::PipelineTask*>> tasks;
+ std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> fragments;
+ for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
+ auto fragment_ctx = fragment_wptr.lock();
+ if (!fragment_ctx) {
+ continue;
+ }
+
+ auto tasks_of_fragment = fragment_ctx->get_revocable_tasks();
+ for (auto* task : tasks_of_fragment) {
+ tasks.emplace_back(task->get_revocable_size(), task);
+ }
+ fragments.emplace_back(std::move(fragment_ctx));
+ }
+
+ std::sort(tasks.begin(), tasks.end(), [](auto&& l, auto&& r) { return
l.first > r.first; });
+
+ const auto mem_limit = query_mem_tracker->limit();
+ const auto target_revoking_size = mem_limit * 0.2;
+ size_t revoked_size = 0;
+
+ std::vector<pipeline::PipelineTask*> chosen_tasks;
+ for (auto&& [revocable_size, task] : tasks) {
+ chosen_tasks.emplace_back(task);
+
+ revoked_size += revocable_size;
+ if (revoked_size >= target_revoking_size) {
+ break;
+ }
+ }
+
+ std::weak_ptr<QueryContext> this_ctx = shared_from_this();
+ auto spill_context =
+ std::make_shared<pipeline::SpillContext>(chosen_tasks.size(),
_query_id, [this_ctx] {
+ auto query_context = this_ctx.lock();
+ if (!query_context) {
+ return;
+ }
+
+ LOG(INFO) << "query: " << print_id(query_context->_query_id)
+ << " all revoking tasks done, resumt it.";
+ query_context->set_memory_sufficient(true);
+ });
+
+ for (auto* task : chosen_tasks) {
+ RETURN_IF_ERROR(task->revoke_memory(spill_context));
+ }
+
+ LOG(INFO) << "query: " << print_id(_query_id) << " total revoked size: "
<< revoked_size
+ << ", target_size: " << target_revoking_size
+ << ", tasks count: " << chosen_tasks.size() << "/" <<
tasks.size();
+
+ return Status::OK();
+}
+
+void QueryContext::decrease_revoking_tasks_count() {
+ _revoking_tasks_count.fetch_sub(1);
+}
+
std::vector<pipeline::PipelineTask*> QueryContext::get_revocable_tasks() const
{
std::vector<pipeline::PipelineTask*> tasks;
for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 96b18eedfb9..b74b835af63 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -22,6 +22,7 @@
#include <gen_cpp/Types_types.h>
#include <atomic>
+#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
@@ -191,6 +192,8 @@ public:
std::vector<pipeline::PipelineTask*> get_revocable_tasks() const;
+ Status revoke_memory();
+
void register_query_statistics(std::shared_ptr<QueryStatistics> qs);
std::shared_ptr<QueryStatistics> get_query_statistics();
@@ -228,7 +231,7 @@ public:
void increase_revoking_tasks_count() { _revoking_tasks_count.fetch_add(1);
}
- void decrease_revoking_tasks_count() { _revoking_tasks_count.fetch_sub(1);
}
+ void decrease_revoking_tasks_count();
int get_revoking_tasks_count() const { return
_revoking_tasks_count.load(); }
@@ -354,6 +357,8 @@ private:
bool _is_pipeline = false;
bool _is_nereids = false;
std::atomic<int> _running_big_mem_op_num = 0;
+
+ std::mutex _revoking_tasks_mutex;
std::atomic<int> _revoking_tasks_count = 0;
// A token used to submit olap scanner to the "_limited_scan_thread_pool",
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 290d1fe1d5b..92235c1ded7 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -17,13 +17,17 @@
#include "workload_group_manager.h"
+#include <glog/logging.h>
+
#include <algorithm>
#include <memory>
#include <mutex>
#include <unordered_map>
+#include "common/status.h"
#include "exec/schema_scanner/schema_scanner_helper.h"
#include "pipeline/task_scheduler.h"
+#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/workload_group/workload_group.h"
#include "util/mem_info.h"
@@ -198,8 +202,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES),
weighted_memory_limit_ratio);
- // LOG_EVERY_T(INFO, 60) << debug_msg;
- LOG(INFO) << debug_msg;
+ LOG_EVERY_T(INFO, 60) << debug_msg;
for (auto& wg : _workload_groups) {
auto wg_mem_limit = wg.second->memory_limit();
auto wg_weighted_mem_limit = int64_t(wg_mem_limit *
weighted_memory_limit_ratio);
@@ -299,8 +302,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
}
// During memory insufficent stage, we already set every query's
memlimit, so that the flag is useless any more.
wg.second->update_memory_sufficent(true);
- // LOG_EVERY_T(INFO, 60) << debug_msg;
- LOG(INFO) << debug_msg;
+ LOG_EVERY_T(INFO, 60) << debug_msg;
}
}
@@ -348,6 +350,7 @@ void WorkloadGroupMgr::add_paused_query(const
std::shared_ptr<QueryContext>& que
* strategy 5: If any query exceed process's memlimit and cache is zero, then
do spill disk or cancel it.
*/
void WorkloadGroupMgr::handle_paused_queries() {
+ const int64_t TIMEOUT_IN_QUEUE = 1000L * 10;
std::unique_lock<std::mutex> lock(_paused_queries_lock);
for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
auto& queries_list = it->second;
@@ -376,9 +379,13 @@ void WorkloadGroupMgr::handle_paused_queries() {
continue;
}
if (query_ctx->is_cancelled()) {
- LOG(INFO) << "query: " << print_id(query_ctx->query_id())
- << "was canceled, remove from paused list";
- query_it = queries_list.erase(query_it);
+ /// Memory may not be released immediately after a query is
canceled.
+ /// So here wait for a while.
+ if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
+ LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+ << " was canceled, remove from paused list";
+ query_it = queries_list.erase(query_it);
+ }
continue;
}
if
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
@@ -401,8 +408,9 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
// Should not put the query back to task scheduler
immediately, because when wg's memory not sufficient,
// and then set wg's flag, other query may not free memory
very quickly.
- if (query_it->elapsed_time() > 1000) {
+ if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
// set wg's memory to insufficent, then add it back to
task scheduler to run.
+ LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< " will be resume.";
query_ctx->set_memory_sufficient(true);
query_it = queries_list.erase(query_it);
} else {
@@ -437,6 +445,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
if
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
0.001 &&
query_it->cache_ratio_ > 0.001) {
+ LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+ << " will be resume after cache adjust.";
query_ctx->set_memory_sufficient(true);
query_it = queries_list.erase(query_it);
continue;
@@ -479,17 +489,7 @@ bool
WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext> query
}
} else {
SCOPED_ATTACH_TASK(query_ctx.get());
- // TODO, should spill the task that has max memory, not all
- for (auto* task : revocable_tasks) {
- auto st = task->revoke_memory();
- if (!st.ok()) {
- query_ctx->cancel(st);
- break;
- }
- }
- LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << ", has "
- << revocable_tasks.size()
- << " tasks to revoke memory, revocable size: " <<
revocable_size;
+ RETURN_IF_ERROR(query_ctx->revoke_memory());
}
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]