This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new f3d4f8d98b3 add low memory mode in scan operator (#40662)
f3d4f8d98b3 is described below
commit f3d4f8d98b37725ace0d62f9678c92f696ba56f4
Author: yiguolei <[email protected]>
AuthorDate: Wed Sep 11 15:04:02 2024 +0800
add low memory mode in scan operator (#40662)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/exec/operator.h | 2 +
be/src/pipeline/pipeline_task.cpp | 1 +
be/src/pipeline/task_scheduler.cpp | 3 +-
be/src/runtime/query_context.h | 49 +++++++++++++++
.../workload_group/workload_group_manager.cpp | 10 +++
be/src/vec/exec/scan/scanner_context.cpp | 72 ++++++++++++++--------
be/src/vec/exec/scan/scanner_context.h | 16 ++++-
be/src/vec/exec/scan/scanner_scheduler.cpp | 9 ++-
8 files changed, 134 insertions(+), 28 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 265cf31f648..c6b49317375 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -198,6 +198,8 @@ public:
void reset_estimate_memory_usage() { _estimate_memory_usage = 0; }
+ bool low_memory_mode() { return
_state->get_query_ctx()->low_memory_mode(); }
+
protected:
friend class OperatorXBase;
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index fe44f8a4b3f..8051bb5d4e6 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -388,6 +388,7 @@ Status PipelineTask::execute(bool* eos) {
<< ", sink name: " << _sink->get_name()
<< ", node id: " << _sink->node_id() << "
failed: " << st.to_string()
<< ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
+ _state->get_query_ctx()->set_low_memory_mode();
bool is_high_wartermark = false;
bool is_low_wartermark = false;
workload_group->check_mem_used(&is_low_wartermark,
&is_high_wartermark);
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 715feceed98..30f3302d429 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -58,9 +58,8 @@ TaskScheduler::~TaskScheduler() {
Status TaskScheduler::start() {
int cores = _task_queue->cores();
- // Init the thread pool with cores+1 thread
+ // Init the thread pool with cores thread
// some for pipeline task running
- // 1 for spill disk query handler
RETURN_IF_ERROR(ThreadPoolBuilder(_name)
.set_min_threads(cores)
.set_max_threads(cores)
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index eb2beb2ba05..fdbfc7c7217 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -264,6 +264,54 @@ public:
}
}
+ // Query will run in low memory mode when
+ // 1. the query is enable spill and wg's low water mark reached, if not
release buffer, it will trigger spill disk, it is very expensive.
+ // 2. the query is not enable spill, but wg's high water mark reached, if
not release buffer, the query will be cancelled.
+ // 3. the process reached soft mem_limit, if not release these, if not
release buffer, the query will be cancelled.
+ // 4. If the query reserve memory failed.
+ // Under low memory mode, the query should release some buffers such as
scan operator block queue, union operator queue, exchange buffer size,
streaming agg
+ bool low_memory_mode() {
+ if (_low_memory_mode) {
+ return true;
+ }
+
+ // If less than 100MB left, then it is low memory mode
+ if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(100 * 1024
* 1024)) {
+ _low_memory_mode = true;
+ LOG(INFO) << "Query " << print_id(_query_id)
+ << " goes to low memory mode due to exceed process soft
memory limit";
+ return true;
+ }
+
+ if (_workload_group) {
+ bool is_low_wartermark = false;
+ bool is_high_wartermark = false;
+ _workload_group->check_mem_used(&is_low_wartermark,
&is_high_wartermark);
+ if (is_high_wartermark) {
+ LOG(INFO)
+ << "Query " << print_id(_query_id)
+ << " goes to low memory mode due to workload group
high water mark reached";
+ _low_memory_mode = true;
+ return true;
+ }
+
+ if (is_low_wartermark &&
+ ((_query_options.__isset.enable_join_spill &&
_query_options.enable_join_spill) ||
+ (_query_options.__isset.enable_sort_spill &&
_query_options.enable_sort_spill) ||
+ (_query_options.__isset.enable_agg_spill &&
_query_options.enable_agg_spill))) {
+ LOG(INFO) << "Query " << print_id(_query_id)
+ << " goes to low memory mode due to workload group
low water mark "
+ "reached and the query enable spill";
+ _low_memory_mode = true;
+ return true;
+ }
+ }
+
+ return _low_memory_mode;
+ }
+
+ void set_low_memory_mode() { _low_memory_mode = true; }
+
private:
int _timeout_second;
TUniqueId _query_id;
@@ -313,6 +361,7 @@ private:
std::mutex _pipeline_map_write_lock;
std::atomic<int64_t> _spill_threshold {0};
+ std::atomic<bool> _low_memory_mode = false;
std::mutex _profile_mutex;
timespec _query_arrival_timestamp;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 39ab564db63..431eaa248ac 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -335,6 +335,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
if (!is_low_wartermark && !is_high_wartermark) {
+ // TODO: should check if there is a large reserve size in the
query's operators
+ // If it exist, then should find the query and spill it.
LOG(INFO) << "**** there are " << queries_list.size() << " to
resume";
for (const auto& query : queries_list) {
LOG(INFO) << "**** resume paused query: " << query.query_id();
@@ -359,6 +361,14 @@ void WorkloadGroupMgr::handle_paused_queries() {
size_t max_memory_usage = 0;
auto it_to_remove = queries_list.end();
+ // TODO: should check buffer type memory first, if could release many
these memory, then not need do spill disk
+ // Buffer Memory are:
+ // 1. caches: page cache, segment cache...
+ // 2. memtables: load memtable
+ // 3. scan queue, exchange sink buffer, union queue
+ // 4. streaming aggs.
+ // If we could not recycle memory from these buffers(< 10%), then do
spill disk.
+
for (auto query_it = queries_list.begin(); query_it !=
queries_list.end();) {
const auto query_ctx = query_it->query_ctx_.lock();
// The query is finished during in paused list.
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index cbb3d0f5723..ed50c195d74 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -266,7 +266,10 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
update_peak_memory_usage(-current_block->allocated_bytes());
// consume current block
block->swap(*current_block);
- return_free_block(std::move(current_block));
+ // If under low memory mode, should not return the freeblock, it
will occupy too memory.
+ if (!_local_state->low_memory_mode()) {
+ return_free_block(std::move(current_block));
+ }
} else {
// This scan task do not have any cached blocks.
_blocks_queue.pop_front();
@@ -275,37 +278,54 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
_num_finished_scanners++;
std::weak_ptr<ScannerDelegate> next_scanner;
// submit one of the remaining scanners
- if (_scanners.try_dequeue(next_scanner)) {
- auto submit_status =
submit_scan_task(std::make_shared<ScanTask>(next_scanner));
- if (!submit_status.ok()) {
- _process_status = submit_status;
- _set_scanner_done();
- return _process_status;
- }
- } else {
- // no more scanner to be scheduled
- // `_free_blocks` serve all running scanners, maybe it's
too large for the remaining scanners
- int free_blocks_for_each = _free_blocks.size_approx() /
_num_running_scanners;
+ // If under low memory mode, then there should be at most 4
scanner running
+ if (_num_running_scanners > low_memory_mode_scanners() &&
+ _local_state->low_memory_mode()) {
_num_running_scanners--;
- for (int i = 0; i < free_blocks_for_each; ++i) {
- vectorized::BlockUPtr removed_block;
- if (_free_blocks.try_dequeue(removed_block)) {
- _block_memory_usage -= block->allocated_bytes();
+ } else {
+ if (_scanners.try_dequeue(next_scanner)) {
+ auto submit_status =
+
submit_scan_task(std::make_shared<ScanTask>(next_scanner));
+ if (!submit_status.ok()) {
+ _process_status = submit_status;
+ _set_scanner_done();
+ return _process_status;
+ }
+ } else {
+ // no more scanner to be scheduled
+ // `_free_blocks` serve all running scanners, maybe
it's too large for the remaining scanners
+ int free_blocks_for_each =
+ _free_blocks.size_approx() /
_num_running_scanners;
+ _num_running_scanners--;
+ for (int i = 0; i < free_blocks_for_each; ++i) {
+ vectorized::BlockUPtr removed_block;
+ if (_free_blocks.try_dequeue(removed_block)) {
+ _block_memory_usage -=
block->allocated_bytes();
+ }
}
}
}
} else {
- // resubmit current running scanner to read the next block
- Status submit_status = submit_scan_task(scan_task);
- if (!submit_status.ok()) {
- _process_status = submit_status;
- _set_scanner_done();
- return _process_status;
+ if (_local_state->low_memory_mode() &&
+ _num_running_scanners > low_memory_mode_scanners()) {
+ _num_running_scanners--;
+ // push the scanner to the stack since it is not eos
+ _scanners.enqueue(scan_task->scanner);
+ } else {
+ // resubmit current running scanner to read the next block
+ Status submit_status = submit_scan_task(scan_task);
+ if (!submit_status.ok()) {
+ _process_status = submit_status;
+ _set_scanner_done();
+ return _process_status;
+ }
}
}
}
- // scale up
- RETURN_IF_ERROR(_try_to_scale_up());
+ if (_local_state->low_memory_mode()) {
+ // scale up
+ RETURN_IF_ERROR(_try_to_scale_up());
+ }
}
if (_num_finished_scanners == _all_scanners.size() &&
_blocks_queue.empty()) {
@@ -488,4 +508,8 @@ void ScannerContext::update_peak_memory_usage(int64_t
usage) {
_local_state->_scanner_peak_memory_usage->add(usage);
}
+bool ScannerContext::low_memory_mode() const {
+ return _local_state->low_memory_mode();
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 03c4e5a4f1b..cb9bd8e71b0 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -171,6 +171,17 @@ public:
int batch_size() const { return _batch_size; }
+ // During low memory mode, there will be at most 4 scanners running and
every scanner will
+ // cache at most 2MB data. So that every instance will keep 8MB buffer.
+ bool low_memory_mode() const;
+
+ // TODO(yiguolei) add this as session variable
+ int32_t low_memory_mode_scan_bytes_per_scanner() const {
+ return 2 * 1024 * 1024; // 2MB
+ }
+
+ int32_t low_memory_mode_scanners() const { return 4; }
+
// the unique id of this context
std::string ctx_id;
TUniqueId _query_id;
@@ -179,6 +190,9 @@ public:
bool _should_reset_thread_name = true;
+ const static int LOW_MEMORY_MODE_SCAN_BYTES = 2 * 1024 * 1024; // 2MB
+ const static int LOW_MEMORY_MODE_MAX_SCANNERS = 4;
+
protected:
/// Four criteria to determine whether to increase the parallelism of the
scanners
/// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
@@ -217,7 +231,7 @@ protected:
moodycamel::ConcurrentQueue<std::weak_ptr<ScannerDelegate>> _scanners;
int32_t _num_scheduled_scanners = 0;
int32_t _num_finished_scanners = 0;
- int32_t _num_running_scanners = 0;
+ std::atomic_int _num_running_scanners = 0;
// weak pointer for _scanners, used in stop function
std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
const int _num_parallel_instances;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 444ff4dbb0c..2352f10ca0c 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -264,7 +264,14 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}
size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
- size_t raw_bytes_read = 0; bool first_read = true;
+ if (ctx->low_memory_mode() &&
+ raw_bytes_threshold >
ctx->low_memory_mode_scan_bytes_per_scanner()) {
+ raw_bytes_threshold =
ctx->low_memory_mode_scan_bytes_per_scanner();
+ }
+
+ size_t raw_bytes_read = 0;
+ bool first_read = true;
+
while (!eos && raw_bytes_read < raw_bytes_threshold) {
if (UNLIKELY(ctx->done())) {
eos = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]