This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e1016b1f0359074f591babaeed9008f11fd5030f
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Sep 13 15:41:53 2024 +0800

    Avoid dead-loop when reserve failed
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  9 ---------
 .../exec/partitioned_hash_join_sink_operator.cpp   |  5 ++++-
 be/src/pipeline/pipeline_task.cpp                  | 23 ++++++++++++++--------
 3 files changed, 19 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 0ddc533bcea..ef706dd0dea 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -137,15 +137,6 @@ size_t 
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
         size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited
     }
     size_to_reserve += _evaluate_mem_usage;
-
-    if (size_to_reserve > 2L * 1024 * 1024 * 1024) [[unlikely]] {
-        LOG(INFO) << "**** too big reserve size: " << size_to_reserve << ", 
rows: " << rows
-                  << ", bucket_size: " << bucket_size
-                  << ", mutable block size: " << 
_build_side_mutable_block.allocated_bytes()
-                  << ", mutable block cols: " << 
_build_side_mutable_block.columns()
-                  << ", _build_col_ids.size: " << _build_col_ids.size();
-    }
-
     return size_to_reserve;
 }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index fc227978d4c..bb0efb17cb9 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -27,6 +27,7 @@
 #include "runtime/fragment_mgr.h"
 #include "util/mem_info.h"
 #include "util/runtime_profile.h"
+#include "vec/spill/spill_stream.h"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
@@ -552,7 +553,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
         if (eos) {
             VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash 
join sink "
                        << node_id() << " sink eos, set_ready_to_read"
-                       << ", task id: " << state->task_id() << ", need spil: " 
<< need_to_spill;
+                       << ", task id: " << state->task_id() << ", need spill: 
" << need_to_spill;
 
             if (!need_to_spill) {
                 if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) 
{
@@ -594,6 +595,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
         RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, 
rows));
         if (eos) {
             return revoke_memory(state);
+        } else if (revocable_mem_size(state) > 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+            return revoke_memory(state);
         }
     } else {
         if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 1a6f8548687..f7e8da9f948 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -396,15 +396,22 @@ Status PipelineTask::execute(bool* eos) {
                     bool is_high_wartermark = false;
                     bool is_low_wartermark = false;
                     workload_group->check_mem_used(&is_low_wartermark, 
&is_high_wartermark);
-                    if (is_low_wartermark || is_high_wartermark) {
-                        /// The larger reserved memory size is likely due to a 
larger available revocable size.
-                        /// If the available memory for revoking is large 
enough, here trigger revoking proactively.
-                        if (_sink->revocable_mem_size(_state) > 512L * 1024 * 
1024) {
-                            LOG(INFO) << "query: " << print_id(query_id)
-                                      << " has big memory to revoke.";
-                            RETURN_IF_ERROR(_sink->revoke_memory(_state));
-                        }
 
+                    /// The larger reserved memory size is likely due to a 
larger available revocable size.
+                    /// If the available memory for revoking is large enough, 
here trigger revoking proactively.
+                    bool need_to_pause = false;
+                    const auto revocable_mem_size = 
_sink->revocable_mem_size(_state);
+                    if (revocable_mem_size > 1024L * 1024 * 1024) {
+                        LOG(INFO) << "query: " << print_id(query_id)
+                                  << ", task id: " << _state->task_id()
+                                  << " has big memory to revoke: " << 
revocable_mem_size;
+                        RETURN_IF_ERROR(_sink->revoke_memory(_state));
+                        need_to_pause = true;
+                    } else {
+                        need_to_pause = is_low_wartermark || 
is_high_wartermark;
+                    }
+
+                    if (need_to_pause) {
                         _memory_sufficient_dependency->block();
                         
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
                                 _state->get_query_ctx()->shared_from_this());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to