This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit e1016b1f0359074f591babaeed9008f11fd5030f Author: Jerry Hu <[email protected]> AuthorDate: Fri Sep 13 15:41:53 2024 +0800 Avoid dead-loop when reserve failed --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 9 --------- .../exec/partitioned_hash_join_sink_operator.cpp | 5 ++++- be/src/pipeline/pipeline_task.cpp | 23 ++++++++++++++-------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 0ddc533bcea..ef706dd0dea 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -137,15 +137,6 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) { size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited } size_to_reserve += _evaluate_mem_usage; - - if (size_to_reserve > 2L * 1024 * 1024 * 1024) [[unlikely]] { - LOG(INFO) << "**** too big reserve size: " << size_to_reserve << ", rows: " << rows - << ", bucket_size: " << bucket_size - << ", mutable block size: " << _build_side_mutable_block.allocated_bytes() - << ", mutable block cols: " << _build_side_mutable_block.columns() - << ", _build_col_ids.size: " << _build_col_ids.size(); - } - return size_to_reserve; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index fc227978d4c..bb0efb17cb9 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -27,6 +27,7 @@ #include "runtime/fragment_mgr.h" #include "util/mem_info.h" #include "util/runtime_profile.h" +#include "vec/spill/spill_stream.h" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { @@ -552,7 +553,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B if (eos) { VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash join sink " << node_id() << " sink eos, set_ready_to_read" - << ", task id: " << state->task_id() << ", need spil: " << need_to_spill; + << ", task id: " << state->task_id() << ", need spill: " << need_to_spill; if (!need_to_spill) { if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { @@ -594,6 +595,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, rows)); if (eos) { return revoke_memory(state); + } else if (revocable_mem_size(state) > vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { + return revoke_memory(state); } } else { if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 1a6f8548687..f7e8da9f948 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -396,15 +396,22 @@ Status PipelineTask::execute(bool* eos) { bool is_high_wartermark = false; bool is_low_wartermark = false; workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark); - if (is_low_wartermark || is_high_wartermark) { - /// The larger reserved memory size is likely due to a larger available revocable size. - /// If the available memory for revoking is large enough, here trigger revoking proactively. - if (_sink->revocable_mem_size(_state) > 512L * 1024 * 1024) { - LOG(INFO) << "query: " << print_id(query_id) - << " has big memory to revoke."; - RETURN_IF_ERROR(_sink->revoke_memory(_state)); - } + /// The larger reserved memory size is likely due to a larger available revocable size. + /// If the available memory for revoking is large enough, here trigger revoking proactively. + bool need_to_pause = false; + const auto revocable_mem_size = _sink->revocable_mem_size(_state); + if (revocable_mem_size > 1024L * 1024 * 1024) { + LOG(INFO) << "query: " << print_id(query_id) + << ", task id: " << _state->task_id() + << " has big memory to revoke: " << revocable_mem_size; + RETURN_IF_ERROR(_sink->revoke_memory(_state)); + need_to_pause = true; + } else { + need_to_pause = is_low_wartermark || is_high_wartermark; + } + + if (need_to_pause) { _memory_sufficient_dependency->block(); ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( _state->get_query_ctx()->shared_from_this()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
