This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 351ba4aeb2c [opt](spill) handle oom exception in spill tasks (#35025) 
(#35171)
351ba4aeb2c is described below

commit 351ba4aeb2c9a1859db6442758ea0d1446191c35
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Jul 15 10:33:33 2024 +0800

    [opt](spill) handle oom exception in spill tasks (#35025) (#35171)
---
 .../pipeline/exec/partition_sort_sink_operator.cpp |  73 +++--
 .../exec/partitioned_aggregation_sink_operator.cpp |  17 +-
 .../partitioned_aggregation_source_operator.cpp    | 145 ++++-----
 .../exec/partitioned_hash_join_probe_operator.cpp  | 341 +++++++--------------
 .../exec/partitioned_hash_join_probe_operator.h    |   5 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   |  60 ++--
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  | 143 ++++-----
 .../pipeline/exec/spill_sort_source_operator.cpp   |  35 ++-
 .../exec/streaming_aggregation_operator.cpp        | 130 ++++----
 be/src/vec/spill/spill_stream.cpp                  |   8 +-
 be/src/vec/spill/spill_stream.h                    |   4 +
 11 files changed, 458 insertions(+), 503 deletions(-)

diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index abe2fde555e..f820914b33e 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -180,42 +180,49 @@ Status 
PartitionSortSinkOperatorX::_emplace_into_hash_table(
         const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* 
input_block,
         PartitionSortSinkLocalState& local_state, bool eos) {
     return std::visit(
-            [&](auto&& agg_method) -> Status {
-                SCOPED_TIMER(local_state._build_timer);
-                using HashMethodType = std::decay_t<decltype(agg_method)>;
-                using AggState = typename HashMethodType::State;
+            vectorized::Overload {
+                    [&](std::monostate& arg) -> Status {
+                        return Status::InternalError("Unit hash table");
+                    },
+                    [&](auto& agg_method) -> Status {
+                        SCOPED_TIMER(local_state._build_timer);
+                        using HashMethodType = 
std::decay_t<decltype(agg_method)>;
+                        using AggState = typename HashMethodType::State;
 
-                AggState state(key_columns);
-                size_t num_rows = input_block->rows();
-                agg_method.init_serialized_keys(key_columns, num_rows);
+                        AggState state(key_columns);
+                        size_t num_rows = input_block->rows();
+                        agg_method.init_serialized_keys(key_columns, num_rows);
 
-                auto creator = [&](const auto& ctor, auto& key, auto& origin) {
-                    HashMethodType::try_presis_key(key, origin, 
*local_state._agg_arena_pool);
-                    auto* aggregate_data = _pool->add(new 
vectorized::PartitionBlocks(
-                            local_state._partition_sort_info, 
local_state._value_places.empty()));
-                    local_state._value_places.push_back(aggregate_data);
-                    ctor(key, aggregate_data);
-                    local_state._num_partition++;
-                };
-                auto creator_for_null_key = [&](auto& mapped) {
-                    mapped = _pool->add(new vectorized::PartitionBlocks(
-                            local_state._partition_sort_info, 
local_state._value_places.empty()));
-                    local_state._value_places.push_back(mapped);
-                    local_state._num_partition++;
-                };
+                        auto creator = [&](const auto& ctor, auto& key, auto& 
origin) {
+                            HashMethodType::try_presis_key(key, origin,
+                                                           
*local_state._agg_arena_pool);
+                            auto* aggregate_data = _pool->add(new 
vectorized::PartitionBlocks(
+                                    local_state._partition_sort_info,
+                                    local_state._value_places.empty()));
+                            
local_state._value_places.push_back(aggregate_data);
+                            ctor(key, aggregate_data);
+                            local_state._num_partition++;
+                        };
+                        auto creator_for_null_key = [&](auto& mapped) {
+                            mapped = _pool->add(new 
vectorized::PartitionBlocks(
+                                    local_state._partition_sort_info,
+                                    local_state._value_places.empty()));
+                            local_state._value_places.push_back(mapped);
+                            local_state._num_partition++;
+                        };
 
-                SCOPED_TIMER(local_state._emplace_key_timer);
-                for (size_t row = 0; row < num_rows; ++row) {
-                    auto& mapped =
-                            agg_method.lazy_emplace(state, row, creator, 
creator_for_null_key);
-                    mapped->add_row_idx(row);
-                }
-                for (auto* place : local_state._value_places) {
-                    SCOPED_TIMER(local_state._selector_block_timer);
-                    
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
-                }
-                return Status::OK();
-            },
+                        SCOPED_TIMER(local_state._emplace_key_timer);
+                        for (size_t row = 0; row < num_rows; ++row) {
+                            auto& mapped = agg_method.lazy_emplace(state, row, 
creator,
+                                                                   
creator_for_null_key);
+                            mapped->add_row_idx(row);
+                        }
+                        for (auto* place : local_state._value_places) {
+                            SCOPED_TIMER(local_state._selector_block_timer);
+                            
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
+                        }
+                        return Status::OK();
+                    }},
             local_state._partitioned_data->method_variant);
 }
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index b610e1b9ed3..92cd341de19 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -298,12 +298,17 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
                 }};
                 auto* runtime_state = _runtime_state.get();
                 auto* agg_data = 
parent._agg_sink_operator->get_agg_data(runtime_state);
-                Base::_shared_state->sink_status = std::visit(
-                        [&](auto&& agg_method) -> Status {
-                            auto& hash_table = *agg_method.hash_table;
-                            return _spill_hash_table(state, agg_method, 
hash_table, _eos);
-                        },
-                        agg_data->method_variant);
+                Base::_shared_state->sink_status =
+                        std::visit(vectorized::Overload {
+                                           [&](std::monostate& arg) -> Status {
+                                               return 
Status::InternalError("Unit hash table");
+                                           },
+                                           [&](auto& agg_method) -> Status {
+                                               auto& hash_table = 
*agg_method.hash_table;
+                                               
RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table(
+                                                       state, agg_method, 
hash_table, _eos));
+                                           }},
+                                   agg_data->method_variant);
                 RETURN_IF_ERROR(Base::_shared_state->sink_status);
                 Base::_shared_state->sink_status =
                         
parent._agg_sink_operator->reset_hash_table(runtime_state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 43c805b9557..6d871451bfd 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -214,80 +214,83 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
+    auto spill_func = [this, state, query_id, execution_context, submit_timer] 
{
+        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+        Defer defer {[&]() {
+            if (!_status.ok() || state->is_cancelled()) {
+                if (!_status.ok()) {
+                    LOG(WARNING) << "query " << print_id(query_id) << " agg 
node "
+                                 << _parent->node_id()
+                                 << " merge spilled agg data error: " << 
_status;
+                }
+                _shared_state->close();
+            } else if (_shared_state->spill_partitions.empty()) {
+                VLOG_DEBUG << "query " << print_id(query_id) << " agg node " 
<< _parent->node_id()
+                           << " merge spilled agg data finish";
+            }
+            
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
+            _is_merging = false;
+            _dependency->Dependency::set_ready();
+        }};
+        bool has_agg_data = false;
+        auto& parent = Base::_parent->template cast<Parent>();
+        while (!state->is_cancelled() && !has_agg_data &&
+               !_shared_state->spill_partitions.empty()) {
+            for (auto& stream : 
_shared_state->spill_partitions[0]->spill_streams_) {
+                stream->set_read_counters(Base::_spill_read_data_time,
+                                          Base::_spill_deserialize_time, 
Base::_spill_read_bytes,
+                                          Base::_spill_read_wait_io_timer);
+                vectorized::Block block;
+                bool eos = false;
+                while (!eos && !state->is_cancelled()) {
+                    {
+                        SCOPED_TIMER(Base::_spill_recover_time);
+                        _status = stream->read_next_block_sync(&block, &eos);
+                    }
+                    RETURN_IF_ERROR(_status);
+
+                    if (!block.empty()) {
+                        has_agg_data = true;
+                        _status = parent._agg_source_operator
+                                          
->merge_with_serialized_key_helper<false>(
+                                                  _runtime_state.get(), 
&block);
+                        RETURN_IF_ERROR(_status);
+                    }
+                }
+                
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+            }
+            _shared_state->spill_partitions.pop_front();
+        }
+        if (_shared_state->spill_partitions.empty()) {
+            _shared_state->close();
+        }
+        return _status;
+    };
+
+    auto exception_catch_func = [spill_func, query_id, mem_tracker, 
shared_state_holder,
+                                 execution_context, this]() {
+        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+        auto shared_state_sptr = shared_state_holder.lock();
+        if (shared_state_sptr) {
+            execution_context_lock = execution_context.lock();
+        }
+        if (!shared_state_sptr || !execution_context_lock) {
+            LOG(INFO) << "query " << print_id(query_id)
+                      << " execution_context released, maybe query was 
cancelled.";
+            return;
+        }
+
+        auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
+
+        if (!status.ok()) {
+            _status = status;
+        }
+    };
 
     RETURN_IF_ERROR(
             
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
-                    [this, state, query_id, mem_tracker, shared_state_holder, 
execution_context,
-                     submit_timer] {
-                        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-                        std::shared_ptr<TaskExecutionContext> 
execution_context_lock;
-                        auto shared_state_sptr = shared_state_holder.lock();
-                        if (shared_state_sptr) {
-                            execution_context_lock = execution_context.lock();
-                        }
-                        if (!shared_state_sptr || !execution_context_lock) {
-                            LOG(INFO) << "query " << print_id(query_id)
-                                      << " execution_context released, maybe 
query was cancelled.";
-                            // FIXME: return status is meaningless?
-                            return Status::Cancelled("Cancelled");
-                        }
-
-                        
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                        Defer defer {[&]() {
-                            if (!_status.ok() || state->is_cancelled()) {
-                                if (!_status.ok()) {
-                                    LOG(WARNING) << "query " << 
print_id(query_id) << " agg node "
-                                                 << _parent->node_id()
-                                                 << " merge spilled agg data 
error: " << _status;
-                                }
-                                _shared_state->close();
-                            } else if 
(_shared_state->spill_partitions.empty()) {
-                                VLOG_DEBUG << "query " << print_id(query_id) 
<< " agg node "
-                                           << _parent->node_id()
-                                           << " merge spilled agg data finish";
-                            }
-                            
Base::_shared_state->in_mem_shared_state->aggregate_data_container
-                                    ->init_once();
-                            _is_merging = false;
-                            _dependency->Dependency::set_ready();
-                        }};
-                        bool has_agg_data = false;
-                        auto& parent = Base::_parent->template cast<Parent>();
-                        while (!state->is_cancelled() && !has_agg_data &&
-                               !_shared_state->spill_partitions.empty()) {
-                            for (auto& stream :
-                                 
_shared_state->spill_partitions[0]->spill_streams_) {
-                                stream->set_read_counters(
-                                        Base::_spill_read_data_time, 
Base::_spill_deserialize_time,
-                                        Base::_spill_read_bytes, 
Base::_spill_read_wait_io_timer);
-                                vectorized::Block block;
-                                bool eos = false;
-                                while (!eos && !state->is_cancelled()) {
-                                    {
-                                        
SCOPED_TIMER(Base::_spill_recover_time);
-                                        _status = 
stream->read_next_block_sync(&block, &eos);
-                                    }
-                                    RETURN_IF_ERROR(_status);
-
-                                    if (!block.empty()) {
-                                        has_agg_data = true;
-                                        _status = parent._agg_source_operator
-                                                          
->merge_with_serialized_key_helper<false>(
-                                                                  
_runtime_state.get(), &block);
-                                        RETURN_IF_ERROR(_status);
-                                    }
-                                }
-                                (void)ExecEnv::GetInstance()
-                                        ->spill_stream_mgr()
-                                        ->delete_spill_stream(stream);
-                            }
-                            _shared_state->spill_partitions.pop_front();
-                        }
-                        if (_shared_state->spill_partitions.empty()) {
-                            _shared_state->close();
-                        }
-                        return _status;
-                    }));
+                    exception_catch_func));
     return Status::OK();
 }
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 628be711b41..fc006764452 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -153,28 +153,7 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
     return Status::OK();
 }
 
-Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* 
state,
-                                                             uint32_t 
partition_index) {
-    auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
-    auto& mutable_block = partitioned_build_blocks[partition_index];
-    if (!mutable_block ||
-        mutable_block->allocated_bytes() < 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-        --_spilling_task_count;
-        return Status::OK();
-    }
-
-    auto& build_spilling_stream = 
_shared_state->spilled_streams[partition_index];
-    if (!build_spilling_stream) {
-        
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
-                state, build_spilling_stream, print_id(state->query_id()), 
"hash_build_sink",
-                _parent->id(), std::numeric_limits<int32_t>::max(),
-                std::numeric_limits<size_t>::max(), _runtime_profile.get()));
-        RETURN_IF_ERROR(build_spilling_stream->prepare_spill());
-        
build_spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
-                                                  _spill_data_size, 
_spill_write_disk_timer,
-                                                  _spill_write_wait_io_timer);
-    }
-
+Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state) {
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     auto execution_context = state->get_task_execution_context();
     /// Resources in shared state will be released when the operator is closed,
@@ -182,14 +161,56 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
     /// So, we need hold the pointer of shared state.
     std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
             _shared_state->shared_from_this();
+
     auto query_id = state->query_id();
     auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
-    return spill_io_pool->submit_func([query_id, mem_tracker, 
shared_state_holder,
-                                       execution_context, state, 
&build_spilling_stream,
-                                       &mutable_block, submit_timer, this] {
+    auto spill_func = [query_id, state, submit_timer, this] {
+        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+        SCOPED_TIMER(_spill_probe_timer);
+
+        auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
+        for (uint32_t partition_index = 0; partition_index != 
p._partition_count;
+             ++partition_index) {
+            auto& blocks = _probe_blocks[partition_index];
+            auto& partitioned_block = _partitioned_blocks[partition_index];
+            if (partitioned_block && partitioned_block->allocated_bytes() >=
+                                             
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+                blocks.emplace_back(partitioned_block->to_block());
+                partitioned_block.reset();
+            }
+
+            auto& spilling_stream = _probe_spilling_streams[partition_index];
+            if (!spilling_stream) {
+                
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+                        state, spilling_stream, print_id(state->query_id()), 
"hash_probe",
+                        _parent->id(), std::numeric_limits<int32_t>::max(),
+                        std::numeric_limits<size_t>::max(), 
_runtime_profile.get()));
+                RETURN_IF_ERROR(spilling_stream->prepare_spill());
+                spilling_stream->set_write_counters(
+                        _spill_serialize_block_timer, _spill_block_count, 
_spill_data_size,
+                        _spill_write_disk_timer, _spill_write_wait_io_timer);
+            }
+
+            COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
+            while (!blocks.empty() && !state->is_cancelled()) {
+                auto block = std::move(blocks.back());
+                blocks.pop_back();
+                RETURN_IF_ERROR(spilling_stream->spill_block(state, block, 
false));
+                COUNTER_UPDATE(_spill_probe_rows, block.rows());
+            }
+        }
+        VLOG_DEBUG << "query: " << print_id(query_id)
+                   << " hash probe revoke done, node: " << p.node_id()
+                   << ", task: " << state->task_id();
+        _dependency->set_ready();
+        return Status::OK();
+    };
+
+    auto exception_catch_func = [query_id, mem_tracker, shared_state_holder, 
execution_context,
+                                 spill_func, this]() {
         SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
         std::shared_ptr<TaskExecutionContext> execution_context_lock;
         auto shared_state_sptr = shared_state_holder.lock();
@@ -201,116 +222,18 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
                       << " execution_context released, maybe query was 
cancelled.";
             return;
         }
-        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-        SCOPED_TIMER(_spill_build_timer);
-        if (_spill_status_ok) {
-            auto build_block = mutable_block->to_block();
-            DCHECK_EQ(mutable_block->rows(), 0);
-            auto st = build_spilling_stream->spill_block(state, build_block, 
false);
-            if (!st.ok()) {
-                std::unique_lock<std::mutex> lock(_spill_lock);
-                _spill_status_ok = false;
-                _spill_status = std::move(st);
-            } else {
-                COUNTER_UPDATE(_spill_build_rows, build_block.rows());
-                COUNTER_UPDATE(_spill_build_blocks, 1);
-            }
-        }
-
-        std::unique_lock<std::mutex> lock(_spill_lock);
-        if (_spilling_task_count.fetch_sub(1) == 1) {
-            LOG(INFO) << "hash probe " << _parent->id()
-                      << " revoke memory spill_build_block finish";
-            _dependency->set_ready();
-        }
-    });
-}
 
-Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state,
-                                                              uint32_t 
partition_index) {
-    auto& spilling_stream = _probe_spilling_streams[partition_index];
-    if (!spilling_stream) {
-        
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
-                state, spilling_stream, print_id(state->query_id()), 
"hash_probe", _parent->id(),
-                std::numeric_limits<int32_t>::max(), 
std::numeric_limits<size_t>::max(),
-                _runtime_profile.get()));
-        RETURN_IF_ERROR(spilling_stream->prepare_spill());
-        spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
-                                            _spill_data_size, 
_spill_write_disk_timer,
-                                            _spill_write_wait_io_timer);
-    }
+        auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
 
-    auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
-
-    auto& blocks = _probe_blocks[partition_index];
-    auto& partitioned_block = _partitioned_blocks[partition_index];
-    if (partitioned_block && partitioned_block->allocated_bytes() >=
-                                     
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-        blocks.emplace_back(partitioned_block->to_block());
-        partitioned_block.reset();
-    }
-
-    if (!blocks.empty()) {
-        auto execution_context = state->get_task_execution_context();
-        /// Resources in shared state will be released when the operator is 
closed,
-        /// but there may be asynchronous spilling tasks at this time, which 
can lead to conflicts.
-        /// So, we need hold the pointer of shared state.
-        std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
-                _shared_state->shared_from_this();
-
-        auto query_id = state->query_id();
-        auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
-
-        MonotonicStopWatch submit_timer;
-        submit_timer.start();
-        return spill_io_pool->submit_func([query_id, mem_tracker, 
shared_state_holder,
-                                           execution_context, state, &blocks, 
spilling_stream,
-                                           submit_timer, this] {
-            SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-            std::shared_ptr<TaskExecutionContext> execution_context_lock;
-            auto shared_state_sptr = shared_state_holder.lock();
-            if (shared_state_sptr) {
-                execution_context_lock = execution_context.lock();
-            }
-            if (!shared_state_sptr || !execution_context_lock) {
-                LOG(INFO) << "query: " << print_id(query_id)
-                          << " execution_context released, maybe query was 
cancelled.";
-                return;
-            }
-            _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-            SCOPED_TIMER(_spill_probe_timer);
-            COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
-            while (!blocks.empty() && !state->is_cancelled()) {
-                auto block = std::move(blocks.back());
-                blocks.pop_back();
-                if (_spill_status_ok) {
-                    auto st = spilling_stream->spill_block(state, block, 
false);
-                    if (!st.ok()) {
-                        std::unique_lock<std::mutex> lock(_spill_lock);
-                        _spill_status_ok = false;
-                        _spill_status = std::move(st);
-                        break;
-                    }
-                    COUNTER_UPDATE(_spill_probe_rows, block.rows());
-                } else {
-                    break;
-                }
-            }
-
-            std::unique_lock<std::mutex> lock(_spill_lock);
-            if (_spilling_task_count.fetch_sub(1) == 1) {
-                LOG(INFO) << "hash probe " << _parent->id()
-                          << " revoke memory spill_probe_blocks finish";
-                _dependency->set_ready();
-            }
-        });
-    } else {
-        std::unique_lock<std::mutex> lock(_spill_lock);
-        if (_spilling_task_count.fetch_sub(1) == 1) {
-            _dependency->set_ready();
+        if (!status.ok()) {
+            _spill_status_ok = false;
+            _spill_status = std::move(status);
         }
-    }
-    return Status::OK();
+        _dependency->set_ready();
+    };
+
+    _dependency->block();
+    return spill_io_pool->submit_func(exception_catch_func);
 }
 
 Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t 
partition_index) {
@@ -361,16 +284,10 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    auto read_func = [this, query_id, mem_tracker, state, spilled_stream = 
spilled_stream,
-                      &mutable_block, shared_state_holder, execution_context, 
submit_timer,
-                      partition_index] {
-        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+    auto read_func = [this, query_id, state, spilled_stream = spilled_stream, 
&mutable_block,
+                      shared_state_holder, submit_timer, partition_index] {
         auto shared_state_sptr = shared_state_holder.lock();
-        if (shared_state_sptr) {
-            execution_context_lock = execution_context.lock();
-        }
-        if (!shared_state_sptr || !execution_context_lock || 
state->is_cancelled()) {
+        if (!shared_state_sptr || state->is_cancelled()) {
             LOG(INFO) << "query: " << print_id(query_id)
                       << " execution_context released, maybe query was 
cancelled.";
             return;
@@ -378,15 +295,12 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
 
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         SCOPED_TIMER(_recovery_build_timer);
-        Defer defer([this] { --_spilling_task_count; });
-        DCHECK_EQ(_spill_status_ok.load(), true);
 
         bool eos = false;
         while (!eos) {
             vectorized::Block block;
             auto st = spilled_stream->read_next_block_sync(&block, &eos);
             if (!st.ok()) {
-                std::unique_lock<std::mutex> lock(_spill_lock);
                 _spill_status_ok = false;
                 _spill_status = std::move(st);
                 break;
@@ -409,7 +323,6 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
                 DCHECK_EQ(mutable_block->columns(), block.columns());
                 st = mutable_block->merge(std::move(block));
                 if (!st.ok()) {
-                    std::unique_lock<std::mutex> lock(_spill_lock);
                     _spill_status_ok = false;
                     _spill_status = std::move(st);
                     break;
@@ -425,16 +338,36 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
         _dependency->set_ready();
     };
 
+    auto exception_catch_func = [read_func, query_id, mem_tracker, 
shared_state_holder,
+                                 execution_context, state, this]() {
+        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+        auto shared_state_sptr = shared_state_holder.lock();
+        if (shared_state_sptr) {
+            execution_context_lock = execution_context.lock();
+        }
+        if (!shared_state_sptr || !execution_context_lock || 
state->is_cancelled()) {
+            LOG(INFO) << "query: " << print_id(query_id)
+                      << " execution_context released, maybe query was 
cancelled.";
+            return;
+        }
+
+        auto status = [&]() {
+            RETURN_IF_CATCH_EXCEPTION(read_func());
+            return Status::OK();
+        }();
+
+        if (!status.ok()) {
+            _spill_status_ok = false;
+            _spill_status = std::move(status);
+        }
+    };
+
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     has_data = true;
     _dependency->block();
 
-    ++_spilling_task_count;
-    auto st = spill_io_pool->submit_func(read_func);
-    if (!st.ok()) {
-        --_spilling_task_count;
-    }
-    return st;
+    return spill_io_pool->submit_func(exception_catch_func);
 }
 
 std::string PartitionedHashJoinProbeLocalState::debug_string(int 
indentation_level) const {
@@ -468,30 +401,14 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    auto read_func = [this, query_id, mem_tracker, shared_state_holder, 
execution_context,
-                      &spilled_stream, &blocks, submit_timer] {
-        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-        std::shared_ptr<TaskExecutionContext> execution_context_lock;
-        auto shared_state_sptr = shared_state_holder.lock();
-        if (shared_state_sptr) {
-            execution_context_lock = execution_context.lock();
-        }
-        if (!shared_state_sptr || !execution_context_lock) {
-            LOG(INFO) << "query: " << print_id(query_id)
-                      << " execution_context released, maybe query was 
cancelled.";
-            return;
-        }
-
+    auto read_func = [this, query_id, &spilled_stream, &blocks, submit_timer] {
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         SCOPED_TIMER(_recovery_probe_timer);
-        Defer defer([this] { --_spilling_task_count; });
-        DCHECK_EQ(_spill_status_ok.load(), true);
 
         vectorized::Block block;
         bool eos = false;
         auto st = spilled_stream->read_next_block_sync(&block, &eos);
         if (!st.ok()) {
-            std::unique_lock<std::mutex> lock(_spill_lock);
             _spill_status_ok = false;
             _spill_status = std::move(st);
         } else {
@@ -510,16 +427,36 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
         _dependency->set_ready();
     };
 
+    auto exception_catch_func = [read_func, mem_tracker, shared_state_holder, 
execution_context,
+                                 query_id, this]() {
+        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+        auto shared_state_sptr = shared_state_holder.lock();
+        if (shared_state_sptr) {
+            execution_context_lock = execution_context.lock();
+        }
+        if (!shared_state_sptr || !execution_context_lock) {
+            LOG(INFO) << "query: " << print_id(query_id)
+                      << " execution_context released, maybe query was 
cancelled.";
+            return;
+        }
+
+        auto status = [&]() {
+            RETURN_IF_CATCH_EXCEPTION(read_func());
+            return Status::OK();
+        }();
+
+        if (!status.ok()) {
+            _spill_status_ok = false;
+            _spill_status = std::move(status);
+        }
+    };
+
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     DCHECK(spill_io_pool != nullptr);
     _dependency->block();
     has_data = true;
-    ++_spilling_task_count;
-    auto st = spill_io_pool->submit_func(read_func);
-    if (!st.ok()) {
-        --_spilling_task_count;
-    }
-    return st;
+    return spill_io_pool->submit_func(exception_catch_func);
 }
 
 
PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool*
 pool,
@@ -701,15 +638,6 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
         return local_state._spill_status;
     }
 
-    if (_should_revoke_memory(state)) {
-        bool wait_for_io = false;
-        RETURN_IF_ERROR((const_cast<PartitionedHashJoinProbeOperatorX*>(this))
-                                ->_revoke_memory(state, wait_for_io));
-        if (wait_for_io) {
-            return Status::OK();
-        }
-    }
-
     const auto partition_index = local_state._partition_cursor;
     auto& probe_blocks = local_state._probe_blocks[partition_index];
     if (local_state._need_to_setup_internal_operators) {
@@ -792,20 +720,8 @@ bool 
PartitionedHashJoinProbeOperatorX::need_data_from_children(RuntimeState* st
 size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* 
state) const {
     auto& local_state = get_local_state(state);
     size_t mem_size = 0;
-    uint32_t spilling_start = local_state._child_eos ? 
local_state._partition_cursor + 1 : 0;
-    DCHECK_GE(spilling_start, local_state._partition_cursor);
-
-    auto& partitioned_build_blocks = 
local_state._shared_state->partitioned_build_blocks;
     auto& probe_blocks = local_state._probe_blocks;
-    for (uint32_t i = spilling_start; i < _partition_count; ++i) {
-        auto& build_block = partitioned_build_blocks[i];
-        if (build_block) {
-            auto block_bytes = build_block->allocated_bytes();
-            if (block_bytes >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-                mem_size += build_block->allocated_bytes();
-            }
-        }
-
+    for (uint32_t i = 0; i < _partition_count; ++i) {
         for (auto& block : probe_blocks[i]) {
             mem_size += block.allocated_bytes();
         }
@@ -821,33 +737,12 @@ size_t 
PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
     return mem_size;
 }
 
-Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, 
bool& wait_for_io) {
+Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
     auto& local_state = get_local_state(state);
-    wait_for_io = false;
-    uint32_t spilling_start = local_state._child_eos ? 
local_state._partition_cursor + 1 : 0;
-    DCHECK_GE(spilling_start, local_state._partition_cursor);
-
-    if (_partition_count > spilling_start) {
-        local_state._spilling_task_count = (_partition_count - spilling_start) 
* 2;
-    } else {
-        return Status::OK();
-    }
-
     VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe 
node: " << id()
-               << ", task: " << state->task_id()
-               << ", revoke memory, spill task count: " << 
local_state._spilling_task_count;
-    for (uint32_t i = spilling_start; i < _partition_count; ++i) {
-        RETURN_IF_ERROR(local_state.spill_build_block(state, i));
-        RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i));
-    }
+               << ", task: " << state->task_id();
 
-    if (local_state._spilling_task_count > 0) {
-        std::unique_lock<std::mutex> lock(local_state._spill_lock);
-        if (local_state._spilling_task_count > 0) {
-            local_state._dependency->block();
-            wait_for_io = true;
-        }
-    }
+    RETURN_IF_ERROR(local_state.spill_probe_blocks(state));
     return Status::OK();
 }
 
@@ -893,11 +788,7 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
 #endif
     if (need_more_input_data(state)) {
         if (need_to_spill && _should_revoke_memory(state)) {
-            bool wait_for_io = false;
-            RETURN_IF_ERROR(_revoke_memory(state, wait_for_io));
-            if (wait_for_io) {
-                return Status::OK();
-            }
+            return _revoke_memory(state);
         }
 
         RETURN_IF_ERROR(_child_x->get_block_after_projects(state, 
local_state._child_block.get(),
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 6be6c5a865b..db20efda67e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -48,8 +48,7 @@ public:
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
-    Status spill_build_block(RuntimeState* state, uint32_t partition_index);
-    Status spill_probe_blocks(RuntimeState* state, uint32_t partition_index);
+    Status spill_probe_blocks(RuntimeState* state);
 
     Status recovery_build_blocks_from_disk(RuntimeState* state, uint32_t 
partition_index,
                                            bool& has_data);
@@ -185,7 +184,7 @@ public:
     }
 
 private:
-    Status _revoke_memory(RuntimeState* state, bool& wait_for_io);
+    Status _revoke_memory(RuntimeState* state);
 
     friend class PartitionedHashJoinProbeLocalState;
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index d253a519b0c..45ca975a88c 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -122,30 +122,15 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
     /// So, we need hold the pointer of shared state.
     std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
             _shared_state->shared_from_this();
-
-    _dependency->block();
     auto query_id = state->query_id();
     auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
-    auto spill_func = [shared_state_holder, execution_context,
-                       build_blocks = std::move(build_blocks), state, 
query_id, mem_tracker,
-                       num_slots, this]() mutable {
-        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+    auto spill_func = [build_blocks = std::move(build_blocks), state, 
num_slots, this]() mutable {
         Defer defer {[&]() {
             // need to reset build_block here, or else build_block will be 
destructed
             // after SCOPED_ATTACH_TASK_WITH_ID and will trigger 
memory_orphan_check failure
             build_blocks.clear();
         }};
 
-        std::shared_ptr<TaskExecutionContext> execution_context_lock;
-        auto shared_state_sptr = shared_state_holder.lock();
-        if (shared_state_sptr) {
-            execution_context_lock = execution_context.lock();
-        }
-        if (!shared_state_sptr || !execution_context_lock || 
state->is_cancelled()) {
-            LOG(INFO) << "execution_context released, maybe query was 
canceled.";
-            return;
-        }
-
         auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
         auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
         std::vector<std::vector<uint32_t>> 
partitions_indexes(p._partition_count);
@@ -228,8 +213,36 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
 
         _dependency->set_ready();
     };
+
+    auto exception_catch_func = [spill_func, shared_state_holder, 
execution_context, state,
+                                 query_id, mem_tracker, this]() mutable {
+        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+        auto shared_state_sptr = shared_state_holder.lock();
+        if (shared_state_sptr) {
+            execution_context_lock = execution_context.lock();
+        }
+        if (!shared_state_sptr || !execution_context_lock || 
state->is_cancelled()) {
+            LOG(INFO) << "execution_context released, maybe query was 
canceled.";
+            return;
+        }
+
+        auto status = [&]() {
+            RETURN_IF_CATCH_EXCEPTION(spill_func());
+            return Status::OK();
+        }();
+
+        if (!status.ok()) {
+            std::unique_lock<std::mutex> lock(_spill_lock);
+            _spill_status = status;
+            _spill_status_ok = false;
+            _dependency->set_ready();
+        }
+    };
     auto* thread_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
-    return thread_pool->submit_func(spill_func);
+
+    _dependency->block();
+    return thread_pool->submit_func(exception_catch_func);
 }
 
 Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
@@ -288,7 +301,18 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
             }
             _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
             SCOPED_TIMER(_spill_build_timer);
-            _spill_to_disk(i, spilling_stream);
+
+            auto status = [&]() {
+                RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i, spilling_stream));
+                return Status::OK();
+            }();
+
+            if (!status.OK()) {
+                std::unique_lock<std::mutex> lock(_spill_lock);
+                _dependency->set_ready();
+                _spill_status_ok = false;
+                _spill_status = std::move(status);
+            }
         });
 
         if (!st.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index c6a943c59b5..004283841ec 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -238,78 +238,83 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
-            [this, state, query_id, mem_tracker, shared_state_holder, &parent, 
execution_context,
-             submit_timer] {
-                SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-                std::shared_ptr<TaskExecutionContext> execution_context_lock;
-                auto shared_state_sptr = shared_state_holder.lock();
-                if (shared_state_sptr) {
-                    execution_context_lock = execution_context.lock();
-                }
-                if (!shared_state_sptr || !execution_context_lock) {
-                    LOG(INFO) << "query " << print_id(query_id)
-                              << " execution_context released, maybe query was 
cancelled.";
-                    return Status::OK();
+    auto spill_func = [this, state, query_id, &parent, submit_timer] {
+        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+        Defer defer {[&]() {
+            if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
+                if (!_shared_state->sink_status.ok()) {
+                    LOG(WARNING) << "query " << print_id(query_id) << " sort 
node " << _parent->id()
+                                 << " revoke memory error: " << 
_shared_state->sink_status;
                 }
+                _shared_state->close();
+            } else {
+                VLOG_DEBUG << "query " << print_id(query_id) << " sort node " 
<< _parent->id()
+                           << " revoke memory finish";
+            }
 
-                
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                Defer defer {[&]() {
-                    if (!_shared_state->sink_status.ok() || 
state->is_cancelled()) {
-                        if (!_shared_state->sink_status.ok()) {
-                            LOG(WARNING) << "query " << print_id(query_id) << 
" sort node "
-                                         << _parent->id()
-                                         << " revoke memory error: " << 
_shared_state->sink_status;
-                        }
-                        _shared_state->close();
-                    } else {
-                        VLOG_DEBUG << "query " << print_id(query_id) << " sort 
node "
-                                   << _parent->id() << " revoke memory finish";
-                    }
-
-                    if (!_shared_state->sink_status.ok()) {
-                        _shared_state->close();
-                    }
-
-                    _spilling_stream.reset();
-                    if (_eos) {
-                        _dependency->set_ready_to_read();
-                        _finish_dependency->set_ready();
-                    } else {
-                        _dependency->Dependency::set_ready();
-                    }
-                }};
-
-                _shared_state->sink_status =
-                        
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
-                RETURN_IF_ERROR(_shared_state->sink_status);
-
-                auto* sink_local_state = 
_runtime_state->get_sink_local_state();
-                update_profile(sink_local_state->profile());
-
-                bool eos = false;
-                vectorized::Block block;
-                while (!eos && !state->is_cancelled()) {
-                    {
-                        SCOPED_TIMER(_spill_merge_sort_timer);
-                        _shared_state->sink_status =
-                                
parent._sort_sink_operator->merge_sort_read_for_spill(
-                                        _runtime_state.get(), &block,
-                                        
_shared_state->spill_block_batch_row_count, &eos);
-                    }
-                    RETURN_IF_ERROR(_shared_state->sink_status);
-                    {
-                        SCOPED_TIMER(Base::_spill_timer);
-                        _shared_state->sink_status =
-                                _spilling_stream->spill_block(state, block, 
eos);
-                    }
-                    RETURN_IF_ERROR(_shared_state->sink_status);
-                    block.clear_column_data();
-                }
-                parent._sort_sink_operator->reset(_runtime_state.get());
+            if (!_shared_state->sink_status.ok()) {
+                _shared_state->close();
+            }
+
+            _spilling_stream.reset();
+            if (_eos) {
+                _dependency->set_ready_to_read();
+                _finish_dependency->set_ready();
+            } else {
+                _dependency->Dependency::set_ready();
+            }
+        }};
+
+        _shared_state->sink_status =
+                
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
+        RETURN_IF_ERROR(_shared_state->sink_status);
+
+        auto* sink_local_state = _runtime_state->get_sink_local_state();
+        update_profile(sink_local_state->profile());
+
+        bool eos = false;
+        vectorized::Block block;
+        while (!eos && !state->is_cancelled()) {
+            {
+                SCOPED_TIMER(_spill_merge_sort_timer);
+                _shared_state->sink_status = 
parent._sort_sink_operator->merge_sort_read_for_spill(
+                        _runtime_state.get(), &block, 
_shared_state->spill_block_batch_row_count,
+                        &eos);
+            }
+            RETURN_IF_ERROR(_shared_state->sink_status);
+            {
+                SCOPED_TIMER(Base::_spill_timer);
+                _shared_state->sink_status = 
_spilling_stream->spill_block(state, block, eos);
+            }
+            RETURN_IF_ERROR(_shared_state->sink_status);
+            block.clear_column_data();
+        }
+        parent._sort_sink_operator->reset(_runtime_state.get());
 
-                return Status::OK();
-            });
+        return Status::OK();
+    };
+
+    auto exception_catch_func = [this, query_id, mem_tracker, 
shared_state_holder,
+                                 execution_context, spill_func]() {
+        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+        auto shared_state_sptr = shared_state_holder.lock();
+        if (shared_state_sptr) {
+            execution_context_lock = execution_context.lock();
+        }
+        if (!shared_state_sptr || !execution_context_lock) {
+            LOG(INFO) << "query " << print_id(query_id)
+                      << " execution_context released, maybe query was 
cancelled.";
+            return;
+        }
+
+        _shared_state->sink_status = [&]() {
+            RETURN_IF_CATCH_EXCEPTION({ return spill_func(); });
+        }();
+    };
+
+    status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
+            exception_catch_func);
     if (!status.ok()) {
         if (!_eos) {
             Base::_dependency->Dependency::set_ready();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index fe6b4ee3efc..18a3d4310fd 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -98,20 +98,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    auto spill_func = [this, state, query_id, mem_tracker, &parent, 
shared_state_holder,
-                       execution_context, submit_timer] {
-        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
-        std::shared_ptr<TaskExecutionContext> execution_context_lock;
-        auto shared_state_sptr = shared_state_holder.lock();
-        if (shared_state_sptr) {
-            execution_context_lock = execution_context.lock();
-        }
-        if (!shared_state_sptr || !execution_context_lock) {
-            LOG(INFO) << "query " << print_id(query_id)
-                      << " execution_context released, maybe query was 
cancelled.";
-            return Status::OK();
-        }
-
+    auto spill_func = [this, state, query_id, &parent, submit_timer] {
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         SCOPED_TIMER(_spill_merge_sort_timer);
         Defer defer {[&]() {
@@ -185,8 +172,26 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
         }
         return Status::OK();
     };
+
+    auto exception_catch_func = [this, query_id, mem_tracker, 
shared_state_holder,
+                                 execution_context, spill_func]() {
+        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+        auto shared_state_sptr = shared_state_holder.lock();
+        if (shared_state_sptr) {
+            execution_context_lock = execution_context.lock();
+        }
+        if (!shared_state_sptr || !execution_context_lock) {
+            LOG(INFO) << "query " << print_id(query_id)
+                      << " execution_context released, maybe query was 
cancelled.";
+            return;
+        }
+
+        _status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); 
}();
+    };
+
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
-            spill_func);
+            exception_catch_func);
 }
 
 Status SpillSortLocalState::_create_intermediate_merger(
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 83952411f46..d7589f59f9f 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -672,68 +672,76 @@ Status 
StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B
     const bool used_too_much_memory =
             spill_streaming_agg_mem_limit > 0 && _memory_usage() > 
spill_streaming_agg_mem_limit;
     RETURN_IF_ERROR(std::visit(
-            [&](auto&& agg_method) -> Status {
-                auto& hash_tbl = *agg_method.hash_table;
-                /// If too much memory is used during the pre-aggregation 
stage,
-                /// it is better to output the data directly without 
performing further aggregation.
-                // do not try to do agg, just init and serialize directly 
return the out_block
-                if (used_too_much_memory || 
(hash_tbl.add_elem_size_overflow(rows) &&
-                                             
!_should_expand_preagg_hash_tables())) {
-                    SCOPED_TIMER(_streaming_agg_timer);
-                    ret_flag = true;
-
-                    // will serialize value data to string column.
-                    // non-nullable column(id in `_make_nullable_keys`)
-                    // will be converted to nullable.
-                    bool mem_reuse = p._make_nullable_keys.empty() && 
out_block->mem_reuse();
-
-                    std::vector<vectorized::DataTypePtr> data_types;
-                    vectorized::MutableColumns value_columns;
-                    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-                        auto data_type =
-                                
_aggregate_evaluators[i]->function()->get_serialized_type();
-                        if (mem_reuse) {
-                            value_columns.emplace_back(
-                                    std::move(*out_block->get_by_position(i + 
key_size).column)
-                                            .mutate());
-                        } else {
-                            // slot type of value it should always be string 
type
-                            value_columns.emplace_back(_aggregate_evaluators[i]
-                                                               ->function()
-                                                               
->create_serialize_column());
+            vectorized::Overload {
+                    [&](std::monostate& arg) -> Status {
+                        return Status::InternalError("Uninited hash table");
+                    },
+                    [&](auto& agg_method) -> Status {
+                        auto& hash_tbl = *agg_method.hash_table;
+                        /// If too much memory is used during the 
pre-aggregation stage,
+                        /// it is better to output the data directly without 
performing further aggregation.
+                        // do not try to do agg, just init and serialize 
directly return the out_block
+                        if (used_too_much_memory || 
(hash_tbl.add_elem_size_overflow(rows) &&
+                                                     
!_should_expand_preagg_hash_tables())) {
+                            SCOPED_TIMER(_streaming_agg_timer);
+                            ret_flag = true;
+
+                            // will serialize value data to string column.
+                            // non-nullable column(id in `_make_nullable_keys`)
+                            // will be converted to nullable.
+                            bool mem_reuse =
+                                    p._make_nullable_keys.empty() && 
out_block->mem_reuse();
+
+                            std::vector<vectorized::DataTypePtr> data_types;
+                            vectorized::MutableColumns value_columns;
+                            for (int i = 0; i < _aggregate_evaluators.size(); 
++i) {
+                                auto data_type =
+                                        
_aggregate_evaluators[i]->function()->get_serialized_type();
+                                if (mem_reuse) {
+                                    value_columns.emplace_back(
+                                            
std::move(*out_block->get_by_position(i + key_size)
+                                                               .column)
+                                                    .mutate());
+                                } else {
+                                    // slot type of value it should always be 
string type
+                                    
value_columns.emplace_back(_aggregate_evaluators[i]
+                                                                       
->function()
+                                                                       
->create_serialize_column());
+                                }
+                                data_types.emplace_back(data_type);
+                            }
+
+                            for (int i = 0; i != _aggregate_evaluators.size(); 
++i) {
+                                SCOPED_TIMER(_serialize_data_timer);
+                                RETURN_IF_ERROR(
+                                        
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
+                                                in_block, value_columns[i], 
rows,
+                                                _agg_arena_pool.get()));
+                            }
+
+                            if (!mem_reuse) {
+                                vectorized::ColumnsWithTypeAndName 
columns_with_schema;
+                                for (int i = 0; i < key_size; ++i) {
+                                    columns_with_schema.emplace_back(
+                                            
key_columns[i]->clone_resized(rows),
+                                            
_probe_expr_ctxs[i]->root()->data_type(),
+                                            
_probe_expr_ctxs[i]->root()->expr_name());
+                                }
+                                for (int i = 0; i < value_columns.size(); ++i) 
{
+                                    
columns_with_schema.emplace_back(std::move(value_columns[i]),
+                                                                     
data_types[i], "");
+                                }
+                                
out_block->swap(vectorized::Block(columns_with_schema));
+                            } else {
+                                for (int i = 0; i < key_size; ++i) {
+                                    
std::move(*out_block->get_by_position(i).column)
+                                            .mutate()
+                                            
->insert_range_from(*key_columns[i], 0, rows);
+                                }
+                            }
                         }
-                        data_types.emplace_back(data_type);
-                    }
-
-                    for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
-                        SCOPED_TIMER(_serialize_data_timer);
-                        
RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
-                                in_block, value_columns[i], rows, 
_agg_arena_pool.get()));
-                    }
-
-                    if (!mem_reuse) {
-                        vectorized::ColumnsWithTypeAndName columns_with_schema;
-                        for (int i = 0; i < key_size; ++i) {
-                            columns_with_schema.emplace_back(
-                                    key_columns[i]->clone_resized(rows),
-                                    _probe_expr_ctxs[i]->root()->data_type(),
-                                    _probe_expr_ctxs[i]->root()->expr_name());
-                        }
-                        for (int i = 0; i < value_columns.size(); ++i) {
-                            
columns_with_schema.emplace_back(std::move(value_columns[i]),
-                                                             data_types[i], 
"");
-                        }
-                        
out_block->swap(vectorized::Block(columns_with_schema));
-                    } else {
-                        for (int i = 0; i < key_size; ++i) {
-                            std::move(*out_block->get_by_position(i).column)
-                                    .mutate()
-                                    ->insert_range_from(*key_columns[i], 0, 
rows);
-                        }
-                    }
-                }
-                return Status::OK();
-            },
+                        return Status::OK();
+                    }},
             _agg_data->method_variant));
 
     if (!ret_flag) {
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index b9c27a9d6ae..0ac9f95563e 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -42,13 +42,17 @@ SpillStream::SpillStream(RuntimeState* state, int64_t 
stream_id, SpillDataDir* d
           spill_dir_(std::move(spill_dir)),
           batch_rows_(batch_rows),
           batch_bytes_(batch_bytes),
+          query_id_(state->query_id()),
           profile_(profile) {}
 
 SpillStream::~SpillStream() {
     bool exists = false;
     auto status = io::global_local_filesystem()->exists(spill_dir_, &exists);
     if (status.ok() && exists) {
-        auto gc_dir = fmt::format("{}/{}/{}", get_data_dir()->path(), 
SPILL_GC_DIR_PREFIX,
+        auto query_dir = fmt::format("{}/{}/{}", get_data_dir()->path(), 
SPILL_GC_DIR_PREFIX,
+                                     print_id(query_id_));
+        (void)io::global_local_filesystem()->create_directory(query_dir);
+        auto gc_dir = fmt::format("{}/{}", query_dir,
                                   
std::filesystem::path(spill_dir_).filename().string());
         (void)io::global_local_filesystem()->rename(spill_dir_, gc_dir);
     }
@@ -62,7 +66,7 @@ Status SpillStream::prepare() {
 }
 
 const TUniqueId& SpillStream::query_id() const {
-    return state_->query_id();
+    return query_id_;
 }
 
 const std::string& SpillStream::get_spill_root_dir() const {
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index cadfa6fb6d4..8751b406608 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -40,6 +40,8 @@ public:
                 std::string spill_dir, size_t batch_rows, size_t batch_bytes,
                 RuntimeProfile* profile);
 
+    SpillStream() = delete;
+
     ~SpillStream();
 
     int64_t id() const { return stream_id_; }
@@ -99,6 +101,8 @@ private:
     SpillWriterUPtr writer_;
     SpillReaderUPtr reader_;
 
+    TUniqueId query_id_;
+
     RuntimeProfile* profile_ = nullptr;
     RuntimeProfile::Counter* write_wait_io_timer_ = nullptr;
     RuntimeProfile::Counter* read_wait_io_timer_ = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to