This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 9bf2285c008 [opt](spill) Optimize the logic for triggering spilling
(#46699)
9bf2285c008 is described below
commit 9bf2285c0084d6a9412a0444856f22362d6c62a5
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Jan 10 16:01:36 2025 +0800
[opt](spill) Optimize the logic for triggering spilling (#46699)
---
be/src/pipeline/pipeline_fragment_context.cpp | 18 +++++++------
be/src/pipeline/pipeline_task.cpp | 38 +++++++++++++++------------
be/src/vec/exec/scan/scanner_scheduler.cpp | 3 +--
3 files changed, 32 insertions(+), 27 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 7cf1a963105..09a14c66a7f 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1279,11 +1279,15 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
/// If `group_by_limit_opt` is true, then it might not need to spill
at all.
const bool enable_spill = _runtime_state->enable_spill() &&
!tnode.agg_node.grouping_exprs.empty() &&
!group_by_limit_opt;
-
- if (tnode.agg_node.aggregate_functions.empty() && !enable_spill &&
-
request.query_options.__isset.enable_distinct_streaming_aggregation &&
- request.query_options.enable_distinct_streaming_aggregation &&
- !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) {
+ const bool is_streaming_agg =
tnode.agg_node.__isset.use_streaming_preaggregation &&
+
tnode.agg_node.use_streaming_preaggregation &&
+ !tnode.agg_node.grouping_exprs.empty();
+ const bool can_use_distinct_streaming_agg =
+ is_streaming_agg && tnode.agg_node.aggregate_functions.empty()
&&
+
request.query_options.__isset.enable_distinct_streaming_aggregation &&
+ request.query_options.enable_distinct_streaming_aggregation;
+
+ if (can_use_distinct_streaming_agg) {
if (enable_query_cache) {
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
@@ -1305,9 +1309,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
}
- } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
- tnode.agg_node.use_streaming_preaggregation &&
- !tnode.agg_node.grouping_exprs.empty()) {
+ } else if (is_streaming_agg) {
if (enable_query_cache) {
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 960b5a813ce..ed40731bfd1 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -426,11 +426,8 @@ Status PipelineTask::execute(bool* eos) {
debug_msg += fmt::format(", debug info: {}",
GlobalMemoryArbitrator::process_mem_log_str());
}
- LOG(INFO) << debug_msg;
-
-
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- _state->get_query_ctx()->shared_from_this(),
reserve_size, st);
- continue;
+ LOG_EVERY_N(INFO, 100) << debug_msg;
+ _state->get_query_ctx()->set_low_memory_mode();
}
}
@@ -443,11 +440,13 @@ Status PipelineTask::execute(bool* eos) {
Status status = Status::OK();
DEFER_RELEASE_RESERVED();
COUNTER_UPDATE(_memory_reserve_times, 1);
- const auto sink_reserve_size = _sink->get_reserve_mem_size(_state,
*eos);
auto workload_group = _state->get_query_ctx()->workload_group();
if (_state->enable_reserve_memory() && workload_group &&
!(wake_up_early() || _dry_run)) {
- status =
thread_context()->try_reserve_memory(sink_reserve_size);
+ const auto sink_reserve_size =
_sink->get_reserve_mem_size(_state, *eos);
+ status = sink_reserve_size != 0
+ ?
thread_context()->try_reserve_memory(sink_reserve_size)
+ : Status::OK();
if (status.ok() && _state->enable_force_spill() &&
_sink->is_spillable() &&
_sink->revocable_mem_size(_state) >=
@@ -468,16 +467,21 @@ Status PipelineTask::execute(bool* eos) {
debug_msg += fmt::format(", debug info: {}",
GlobalMemoryArbitrator::process_mem_log_str());
}
- VLOG_DEBUG << debug_msg;
-
- DCHECK_EQ(_pending_block.get(), nullptr);
- _pending_block = std::move(_block);
- _block =
vectorized::Block::create_unique(_pending_block->clone_empty());
-
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- _state->get_query_ctx()->shared_from_this(),
sink_reserve_size, status);
- _pending_eos = *eos;
- *eos = false;
- continue;
+
+ if (_sink->revocable_mem_size(_state) >=
_state->spill_min_revocable_mem()) {
+ VLOG_DEBUG << debug_msg;
+ DCHECK_EQ(_pending_block.get(), nullptr);
+ _pending_block = std::move(_block);
+ _block =
vectorized::Block::create_unique(_pending_block->clone_empty());
+
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+ _state->get_query_ctx()->shared_from_this(),
sink_reserve_size,
+ status);
+ _pending_eos = *eos;
+ *eos = false;
+ continue;
+ } else {
+ _state->get_query_ctx()->set_low_memory_mode();
+ }
}
}
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index cd7f6cfb0ed..8a2387cc69e 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -207,8 +207,7 @@ void handle_reserve_memory_failure(RuntimeState* state,
std::shared_ptr<ScannerC
}
LOG(INFO) << debug_msg;
- ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- state->get_query_ctx()->shared_from_this(), reserve_size, st);
+ state->get_query_ctx()->set_low_memory_mode();
}
void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]