This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3fc1c03dd93 branch-4.0: [chore](spill) refactor lambda to function 
#59584 (#59592)
3fc1c03dd93 is described below

commit 3fc1c03dd93bbd3a50bdda3a8b031286f35e733d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 7 16:54:47 2026 +0800

    branch-4.0: [chore](spill) refactor lambda to function #59584 (#59592)
    
    Cherry-picked from #59584
    
    Co-authored-by: TengJianPing <[email protected]>
---
 .../exec/partitioned_aggregation_sink_operator.cpp | 112 ++++----
 .../exec/partitioned_aggregation_sink_operator.h   |   2 +
 .../exec/partitioned_hash_join_sink_operator.cpp   | 294 +++++++++++----------
 .../exec/partitioned_hash_join_sink_operator.h     |   7 +
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  | 126 +++++----
 be/src/pipeline/exec/spill_sort_sink_operator.h    |   2 +
 .../pipeline/exec/spill_sort_source_operator.cpp   | 173 ++++++------
 be/src/pipeline/exec/spill_sort_source_operator.h  |   3 +
 8 files changed, 368 insertions(+), 351 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 2594221e153..391f30d82d1 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -405,6 +405,57 @@ Status 
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
     return Status::OK();
 }
 
+Status PartitionedAggSinkLocalState::_execute_spill_process(RuntimeState* 
state,
+                                                            size_t 
size_to_revoke) {
+    Status status;
+    auto& parent = Base::_parent->template cast<Parent>();
+    auto query_id = state->query_id();
+
+    
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
+        status = Status::InternalError("fault_inject partitioned_agg_sink 
revoke_memory canceled");
+        state->get_query_ctx()->cancel(status);
+        return status;
+    });
+
+    Defer defer {[&]() {
+        if (!status.ok() || state->is_cancelled()) {
+            if (!status.ok()) {
+                LOG(WARNING) << fmt::format(
+                        "Query:{}, agg sink:{}, task:{}, revoke_memory 
error:{}",
+                        print_id(query_id), Base::_parent->node_id(), 
state->task_id(), status);
+            }
+            _shared_state->close();
+        } else {
+            LOG(INFO) << fmt::format(
+                    "Query:{}, agg sink:{}, task:{}, revoke_memory finish, 
eos:{}, revocable "
+                    "memory:{}",
+                    print_id(state->query_id()), _parent->node_id(), 
state->task_id(), _eos,
+                    
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
+        }
+
+        if (_eos) {
+            Base::_dependency->set_ready_to_read();
+        }
+        
state->get_query_ctx()->resource_ctx()->task_controller()->decrease_revoking_tasks_count();
+    }};
+
+    auto* runtime_state = _runtime_state.get();
+    auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state);
+    status = std::visit(
+            vectorized::Overload {[&](std::monostate& arg) -> Status {
+                                      return Status::InternalError("Unit hash 
table");
+                                  },
+                                  [&](auto& agg_method) -> Status {
+                                      auto& hash_table = 
*agg_method.hash_table;
+                                      RETURN_IF_CATCH_EXCEPTION(return 
_spill_hash_table(
+                                              state, agg_method, hash_table, 
size_to_revoke, _eos));
+                                  }},
+            agg_data->method_variant);
+    RETURN_IF_ERROR(status);
+    status = parent._agg_sink_operator->reset_hash_table(runtime_state);
+    return status;
+}
+
 Status PartitionedAggSinkLocalState::revoke_memory(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     const auto size_to_revoke = _parent->revocable_mem_size(state);
@@ -423,9 +474,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(
         update_profile<true>(sink_local_state->custom_profile());
     }
 
-    auto& parent = Base::_parent->template cast<Parent>();
-    auto query_id = state->query_id();
-
     
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_submit_func",
 {
         return Status::Error<INTERNAL_ERROR>(
                 "fault_inject partitioned_agg_sink revoke_memory submit_func 
failed");
@@ -433,60 +481,10 @@ Status PartitionedAggSinkLocalState::revoke_memory(
 
     
state->get_query_ctx()->resource_ctx()->task_controller()->increase_revoking_tasks_count();
 
-    SpillSinkRunnable spill_runnable(
-            state, spill_context, operator_profile(),
-            [this, &parent, state, query_id, size_to_revoke] {
-                Status status;
-                
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
-                    status = Status::InternalError(
-                            "fault_inject partitioned_agg_sink "
-                            "revoke_memory canceled");
-                    state->get_query_ctx()->cancel(status);
-                    return status;
-                });
-                Defer defer {[&]() {
-                    if (!status.ok() || state->is_cancelled()) {
-                        if (!status.ok()) {
-                            LOG(WARNING) << fmt::format(
-                                    "Query:{}, agg sink:{}, task:{}, 
revoke_memory error:{}",
-                                    print_id(query_id), 
Base::_parent->node_id(), state->task_id(),
-                                    status);
-                        }
-                        _shared_state->close();
-                    } else {
-                        LOG(INFO) << fmt::format(
-                                "Query:{}, agg sink:{}, task:{}, revoke_memory 
finish, eos:{}, "
-                                "revocable memory:{}",
-                                print_id(state->query_id()), 
_parent->node_id(), state->task_id(),
-                                _eos,
-                                
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
-                    }
-
-                    if (_eos) {
-                        Base::_dependency->set_ready_to_read();
-                    }
-                    state->get_query_ctx()
-                            ->resource_ctx()
-                            ->task_controller()
-                            ->decrease_revoking_tasks_count();
-                }};
-                auto* runtime_state = _runtime_state.get();
-                auto* agg_data = 
parent._agg_sink_operator->get_agg_data(runtime_state);
-                status = std::visit(
-                        vectorized::Overload {
-                                [&](std::monostate& arg) -> Status {
-                                    return Status::InternalError("Unit hash 
table");
-                                },
-                                [&](auto& agg_method) -> Status {
-                                    auto& hash_table = *agg_method.hash_table;
-                                    RETURN_IF_CATCH_EXCEPTION(return 
_spill_hash_table(
-                                            state, agg_method, hash_table, 
size_to_revoke, _eos));
-                                }},
-                        agg_data->method_variant);
-                RETURN_IF_ERROR(status);
-                status = 
parent._agg_sink_operator->reset_hash_table(runtime_state);
-                return status;
-            });
+    SpillSinkRunnable spill_runnable(state, spill_context, operator_profile(),
+                                     [this, state, size_to_revoke] {
+                                         return _execute_spill_process(state, 
size_to_revoke);
+                                     });
 
     return spill_runnable.run();
 }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 6f17533a97d..4af8c110ac7 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -49,6 +49,8 @@ public:
 
     Status revoke_memory(RuntimeState* state, const 
std::shared_ptr<SpillContext>& spill_context);
 
+    Status _execute_spill_process(RuntimeState* state, size_t size_to_revoke);
+
     Status setup_in_memory_agg_op(RuntimeState* state);
 
     template <bool spilled>
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index ae3ef2c4d57..fb05e6bc8b3 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -166,6 +166,93 @@ Dependency* 
PartitionedHashJoinSinkLocalState::finishdependency() {
     return _finish_dependency.get();
 }
 
+Status PartitionedHashJoinSinkLocalState::_execute_spill_unpartitioned_block(
+        RuntimeState* state, vectorized::Block&& build_block) {
+    Defer defer1 {[&]() { update_memory_usage(); }};
+    auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+    std::vector<std::vector<uint32_t>> partitions_indexes(p._partition_count);
+
+    const size_t reserved_size = 4096;
+    std::ranges::for_each(partitions_indexes,
+                          [](std::vector<uint32_t>& indices) { 
indices.reserve(reserved_size); });
+
+    size_t total_rows = build_block.rows();
+    size_t offset = 1;
+    while (offset < total_rows) {
+        auto sub_block = build_block.clone_empty();
+        size_t this_run = std::min(reserved_size, total_rows - offset);
+
+        for (size_t i = 0; i != build_block.columns(); ++i) {
+            sub_block.get_by_position(i).column =
+                    build_block.get_by_position(i).column->cut(offset, 
this_run);
+        }
+        int64_t sub_blocks_memory_usage = sub_block.allocated_bytes();
+        COUNTER_UPDATE(_memory_used_counter, sub_blocks_memory_usage);
+        Defer defer2 {[&]() { COUNTER_UPDATE(_memory_used_counter, 
-sub_blocks_memory_usage); }};
+
+        offset += this_run;
+        const auto is_last_block = offset == total_rows;
+
+        {
+            SCOPED_TIMER(_partition_timer);
+            (void)_partitioner->do_partitioning(state, &sub_block);
+        }
+
+        const auto* channel_ids = 
_partitioner->get_channel_ids().get<uint32_t>();
+        for (size_t i = 0; i != sub_block.rows(); ++i) {
+            partitions_indexes[channel_ids[i]].emplace_back(i);
+        }
+
+        for (uint32_t partition_idx = 0; partition_idx != p._partition_count; 
++partition_idx) {
+            auto* begin = partitions_indexes[partition_idx].data();
+            auto* end = begin + partitions_indexes[partition_idx].size();
+            auto& partition_block = partitioned_blocks[partition_idx];
+            vectorized::SpillStreamSPtr& spilling_stream =
+                    _shared_state->spilled_streams[partition_idx];
+            if (UNLIKELY(!partition_block)) {
+                partition_block =
+                        
vectorized::MutableBlock::create_unique(build_block.clone_empty());
+            }
+
+            int64_t old_mem = partition_block->allocated_bytes();
+            {
+                SCOPED_TIMER(_partition_shuffle_timer);
+                RETURN_IF_ERROR(partition_block->add_rows(&sub_block, begin, 
end));
+                partitions_indexes[partition_idx].clear();
+            }
+            int64_t new_mem = partition_block->allocated_bytes();
+
+            if (partition_block->rows() >= reserved_size || is_last_block) {
+                auto block = partition_block->to_block();
+                RETURN_IF_ERROR(spilling_stream->spill_block(state, block, 
false));
+                partition_block =
+                        
vectorized::MutableBlock::create_unique(build_block.clone_empty());
+                COUNTER_UPDATE(_memory_used_counter, -new_mem);
+            } else {
+                COUNTER_UPDATE(_memory_used_counter, new_mem - old_mem);
+            }
+        }
+    }
+
+    Status status;
+    if (_child_eos) {
+        std::ranges::for_each(_shared_state->partitioned_build_blocks, 
[&](auto& block) {
+            if (block) {
+                COUNTER_UPDATE(_in_mem_rows_counter, block->rows());
+            }
+        });
+        status = _finish_spilling();
+        VLOG_DEBUG << fmt::format(
+                "Query:{}, hash join sink:{}, task:{}, 
_revoke_unpartitioned_block, "
+                "set_ready_to_read",
+                print_id(state->query_id()), _parent->node_id(), 
state->task_id());
+        _dependency->set_ready_to_read();
+    }
+
+    return status;
+}
+
 Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
@@ -210,96 +297,11 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
         COUNTER_UPDATE(_memory_used_counter, build_block.allocated_bytes() - 
block_old_mem);
     }
 
-    auto spill_func = [build_block = std::move(build_block), state, this]() 
mutable {
-        Defer defer1 {[&]() { update_memory_usage(); }};
-        auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
-        auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
-        std::vector<std::vector<uint32_t>> 
partitions_indexes(p._partition_count);
-
-        const size_t reserved_size = 4096;
-        std::ranges::for_each(partitions_indexes, [](std::vector<uint32_t>& 
indices) {
-            indices.reserve(reserved_size);
-        });
-
-        size_t total_rows = build_block.rows();
-        size_t offset = 1;
-        while (offset < total_rows) {
-            auto sub_block = build_block.clone_empty();
-            size_t this_run = std::min(reserved_size, total_rows - offset);
-
-            for (size_t i = 0; i != build_block.columns(); ++i) {
-                sub_block.get_by_position(i).column =
-                        build_block.get_by_position(i).column->cut(offset, 
this_run);
-            }
-            int64_t sub_blocks_memory_usage = sub_block.allocated_bytes();
-            COUNTER_UPDATE(_memory_used_counter, sub_blocks_memory_usage);
-            Defer defer2 {
-                    [&]() { COUNTER_UPDATE(_memory_used_counter, 
-sub_blocks_memory_usage); }};
-
-            offset += this_run;
-            const auto is_last_block = offset == total_rows;
-
-            {
-                SCOPED_TIMER(_partition_timer);
-                (void)_partitioner->do_partitioning(state, &sub_block);
-            }
-
-            const auto* channel_ids = 
_partitioner->get_channel_ids().get<uint32_t>();
-            for (size_t i = 0; i != sub_block.rows(); ++i) {
-                partitions_indexes[channel_ids[i]].emplace_back(i);
-            }
-
-            for (uint32_t partition_idx = 0; partition_idx != 
p._partition_count; ++partition_idx) {
-                auto* begin = partitions_indexes[partition_idx].data();
-                auto* end = begin + partitions_indexes[partition_idx].size();
-                auto& partition_block = partitioned_blocks[partition_idx];
-                vectorized::SpillStreamSPtr& spilling_stream =
-                        _shared_state->spilled_streams[partition_idx];
-                if (UNLIKELY(!partition_block)) {
-                    partition_block =
-                            
vectorized::MutableBlock::create_unique(build_block.clone_empty());
-                }
-
-                int64_t old_mem = partition_block->allocated_bytes();
-                {
-                    SCOPED_TIMER(_partition_shuffle_timer);
-                    RETURN_IF_ERROR(partition_block->add_rows(&sub_block, 
begin, end));
-                    partitions_indexes[partition_idx].clear();
-                }
-                int64_t new_mem = partition_block->allocated_bytes();
-
-                if (partition_block->rows() >= reserved_size || is_last_block) 
{
-                    auto block = partition_block->to_block();
-                    RETURN_IF_ERROR(spilling_stream->spill_block(state, block, 
false));
-                    partition_block =
-                            
vectorized::MutableBlock::create_unique(build_block.clone_empty());
-                    COUNTER_UPDATE(_memory_used_counter, -new_mem);
-                } else {
-                    COUNTER_UPDATE(_memory_used_counter, new_mem - old_mem);
-                }
-            }
-        }
-
-        Status status;
-        if (_child_eos) {
-            std::ranges::for_each(_shared_state->partitioned_build_blocks, 
[&](auto& block) {
-                if (block) {
-                    COUNTER_UPDATE(_in_mem_rows_counter, block->rows());
-                }
-            });
-            status = _finish_spilling();
-            VLOG_DEBUG << fmt::format(
-                    "Query:{}, hash join sink:{}, task:{}, 
_revoke_unpartitioned_block, "
-                    "set_ready_to_read",
-                    print_id(state->query_id()), _parent->node_id(), 
state->task_id());
-            _dependency->set_ready_to_read();
-        }
-
-        return status;
-    };
-
-    auto exception_catch_func = [spill_func]() mutable {
-        auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return spill_func()); 
}();
+    auto exception_catch_func = [this, state, build_block = 
std::move(build_block)]() mutable {
+        auto status = [&]() {
+            RETURN_IF_CATCH_EXCEPTION(
+                    return _execute_spill_unpartitioned_block(state, 
std::move(build_block)));
+        }();
         return status;
     };
 
@@ -330,6 +332,60 @@ Status 
PartitionedHashJoinSinkLocalState::terminate(RuntimeState* state) {
     return 
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>::terminate(state);
 }
 
+Status PartitionedHashJoinSinkLocalState::_finish_spilling_callback(
+        RuntimeState* state, TUniqueId query_id,
+        const std::shared_ptr<SpillContext>& spill_context) {
+    Status status;
+    if (_child_eos) {
+        LOG(INFO) << fmt::format(
+                "Query:{}, hash join sink:{}, task:{}, finish spilling, 
set_ready_to_read",
+                print_id(query_id), _parent->node_id(), state->task_id());
+        std::ranges::for_each(_shared_state->partitioned_build_blocks, 
[&](auto& block) {
+            if (block) {
+                COUNTER_UPDATE(_in_mem_rows_counter, block->rows());
+            }
+        });
+        status = _finish_spilling();
+        _dependency->set_ready_to_read();
+    }
+
+    if (spill_context) {
+        spill_context->on_task_finished();
+    }
+
+    return status;
+}
+
+Status 
PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(RuntimeState*
 state,
+                                                                            
TUniqueId query_id) {
+    
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
 {
+        auto status = Status::InternalError(
+                "fault_inject partitioned_hash_join_sink revoke_memory 
canceled");
+        state->get_query_ctx()->cancel(status);
+        return status;
+    });
+    SCOPED_TIMER(_spill_build_timer);
+
+    for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); 
++i) {
+        vectorized::SpillStreamSPtr& spilling_stream = 
_shared_state->spilled_streams[i];
+        DCHECK(spilling_stream != nullptr);
+        auto& mutable_block = _shared_state->partitioned_build_blocks[i];
+
+        if (!mutable_block ||
+            mutable_block->allocated_bytes() < 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+            continue;
+        }
+
+        auto status = [&]() {
+            RETURN_IF_CATCH_EXCEPTION(
+                    return _spill_to_disk(static_cast<uint32_t>(i), 
spilling_stream));
+        }();
+
+        RETURN_IF_ERROR(status);
+    }
+    return Status::OK();
+}
+
 Status PartitionedHashJoinSinkLocalState::revoke_memory(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     SCOPED_TIMER(_spill_total_timer);
@@ -344,62 +400,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
     }
 
     const auto query_id = state->query_id();
-    auto spill_fin_cb = [this, state, query_id, spill_context]() {
-        Status status;
-        if (_child_eos) {
-            LOG(INFO) << fmt::format(
-                    "Query:{}, hash join sink:{}, task:{}, finish spilling, 
set_ready_to_read",
-                    print_id(query_id), _parent->node_id(), state->task_id());
-            std::ranges::for_each(_shared_state->partitioned_build_blocks, 
[&](auto& block) {
-                if (block) {
-                    COUNTER_UPDATE(_in_mem_rows_counter, block->rows());
-                }
-            });
-            status = _finish_spilling();
-            _dependency->set_ready_to_read();
-        }
-
-        if (spill_context) {
-            spill_context->on_task_finished();
-        }
-
-        return status;
-    };
-
     SpillSinkRunnable spill_runnable(
             state, nullptr, operator_profile(),
-            [this, state, query_id] {
-                
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
 {
-                    auto status = Status::InternalError(
-                            "fault_inject partitioned_hash_join_sink "
-                            "revoke_memory canceled");
-                    state->get_query_ctx()->cancel(status);
-                    return status;
-                });
-                SCOPED_TIMER(_spill_build_timer);
-
-                for (size_t i = 0; i != 
_shared_state->partitioned_build_blocks.size(); ++i) {
-                    vectorized::SpillStreamSPtr& spilling_stream =
-                            _shared_state->spilled_streams[i];
-                    DCHECK(spilling_stream != nullptr);
-                    auto& mutable_block = 
_shared_state->partitioned_build_blocks[i];
-
-                    if (!mutable_block ||
-                        mutable_block->allocated_bytes() <
-                                
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-                        continue;
-                    }
-
-                    auto status = [&]() {
-                        RETURN_IF_CATCH_EXCEPTION(
-                                return 
_spill_to_disk(static_cast<uint32_t>(i), spilling_stream));
-                    }();
-
-                    RETURN_IF_ERROR(status);
-                }
-                return Status::OK();
-            },
-            spill_fin_cb);
+            [this, state, query_id] { return 
_execute_spill_partitioned_blocks(state, query_id); },
+            [this, state, query_id, spill_context]() {
+                return _finish_spilling_callback(state, query_id, 
spill_context);
+            });
 
     return spill_runnable.run();
 }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index a2fd8ea69ee..5d7cd5263db 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -71,8 +71,15 @@ protected:
     Status _revoke_unpartitioned_block(RuntimeState* state,
                                        const std::shared_ptr<SpillContext>& 
spill_context);
 
+    Status _execute_spill_unpartitioned_block(RuntimeState* state, 
vectorized::Block&& build_block);
+
     Status _finish_spilling();
 
+    Status _finish_spilling_callback(RuntimeState* state, TUniqueId query_id,
+                                     const std::shared_ptr<SpillContext>& 
spill_context);
+
+    Status _execute_spill_partitioned_blocks(RuntimeState* state, TUniqueId 
query_id);
+
     Status _setup_internal_operator(RuntimeState* state);
 
     friend class PartitionedHashJoinSinkOperatorX;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index bac215e3f3c..1880f1d2e76 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -179,6 +179,62 @@ size_t 
SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool e
                                                                           eos);
 }
 
+Status SpillSortSinkLocalState::_execute_spill_sort(RuntimeState* state, 
TUniqueId query_id) {
+    auto& parent = Base::_parent->template cast<Parent>();
+    Status status;
+    Defer defer {[&]() {
+        if (!status.ok() || state->is_cancelled()) {
+            if (!status.ok()) {
+                LOG(WARNING) << fmt::format(
+                        "Query:{}, sort sink:{}, task:{}, revoke memory 
error:{}",
+                        print_id(query_id), _parent->node_id(), 
state->task_id(), status);
+            }
+            _shared_state->close();
+        } else {
+            VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{}, revoke 
memory finish",
+                                      print_id(query_id), _parent->node_id(), 
state->task_id());
+        }
+
+        if (!status.ok()) {
+            _shared_state->close();
+        }
+
+        _spilling_stream.reset();
+        
state->get_query_ctx()->resource_ctx()->task_controller()->decrease_revoking_tasks_count();
+        if (_eos) {
+            _dependency->set_ready_to_read();
+        }
+    }};
+
+    status = 
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
+    RETURN_IF_ERROR(status);
+
+    auto* sink_local_state = _runtime_state->get_sink_local_state();
+    update_profile(sink_local_state->custom_profile());
+
+    bool eos = false;
+    vectorized::Block block;
+
+    int32_t batch_size =
+            _shared_state->spill_block_batch_row_count > 
std::numeric_limits<int32_t>::max()
+                    ? std::numeric_limits<int32_t>::max()
+                    : 
static_cast<int32_t>(_shared_state->spill_block_batch_row_count);
+    while (!eos && !state->is_cancelled()) {
+        {
+            SCOPED_TIMER(_spill_merge_sort_timer);
+            status = parent._sort_sink_operator->merge_sort_read_for_spill(
+                    _runtime_state.get(), &block, batch_size, &eos);
+        }
+        RETURN_IF_ERROR(status);
+        status = _spilling_stream->spill_block(state, block, eos);
+        RETURN_IF_ERROR(status);
+        block.clear_column_data();
+    }
+    parent._sort_sink_operator->reset(_runtime_state.get());
+
+    return Status::OK();
+}
+
 Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
                                               const 
std::shared_ptr<SpillContext>& spill_context) {
     auto& parent = Base::_parent->template cast<Parent>();
@@ -205,75 +261,17 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
     _shared_state->sorted_streams.emplace_back(_spilling_stream);
 
     auto query_id = state->query_id();
-
-    auto spill_func = [this, state, query_id, &parent] {
-        Status status;
-        Defer defer {[&]() {
-            if (!status.ok() || state->is_cancelled()) {
-                if (!status.ok()) {
-                    LOG(WARNING) << fmt::format(
-                            "Query:{}, sort sink:{}, task:{}, revoke memory 
error:{}",
-                            print_id(query_id), _parent->node_id(), 
state->task_id(), status);
-                }
-                _shared_state->close();
-            } else {
-                VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{}, 
revoke memory finish",
-                                          print_id(query_id), 
_parent->node_id(), state->task_id());
-            }
-
-            if (!status.ok()) {
-                _shared_state->close();
-            }
-
-            _spilling_stream.reset();
-            state->get_query_ctx()
-                    ->resource_ctx()
-                    ->task_controller()
-                    ->decrease_revoking_tasks_count();
-            if (_eos) {
-                _dependency->set_ready_to_read();
-            }
-        }};
-
-        status = 
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
-        RETURN_IF_ERROR(status);
-
-        auto* sink_local_state = _runtime_state->get_sink_local_state();
-        update_profile(sink_local_state->custom_profile());
-
-        bool eos = false;
-        vectorized::Block block;
-
-        int32_t batch_size =
-                _shared_state->spill_block_batch_row_count > 
std::numeric_limits<int32_t>::max()
-                        ? std::numeric_limits<int32_t>::max()
-                        : 
static_cast<int32_t>(_shared_state->spill_block_batch_row_count);
-        while (!eos && !state->is_cancelled()) {
-            {
-                SCOPED_TIMER(_spill_merge_sort_timer);
-                status = parent._sort_sink_operator->merge_sort_read_for_spill(
-                        _runtime_state.get(), &block, batch_size, &eos);
-            }
-            RETURN_IF_ERROR(status);
-            status = _spilling_stream->spill_block(state, block, eos);
-            RETURN_IF_ERROR(status);
-            block.clear_column_data();
-        }
-        parent._sort_sink_operator->reset(_runtime_state.get());
-
-        return Status::OK();
-    };
-
-    auto exception_catch_func = [query_id, state, spill_func]() {
+    auto exception_catch_func = [this, query_id, state]() {
         DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", 
{
-            auto status = Status::InternalError(
-                    "fault_inject spill_sort_sink "
-                    "revoke_memory canceled");
+            auto status =
+                    Status::InternalError("fault_inject spill_sort_sink 
revoke_memory canceled");
             state->get_query_ctx()->cancel(status);
             return status;
         });
 
-        auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
+        auto status = [&]() {
+            RETURN_IF_CATCH_EXCEPTION({ return _execute_spill_sort(state, 
query_id); });
+        }();
 
         return status;
     };
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 49a57c0ddda..84c5be478c5 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -50,6 +50,8 @@ private:
     void _init_counters();
     void update_profile(RuntimeProfile* child_profile);
 
+    Status _execute_spill_sort(RuntimeState* state, TUniqueId query_id);
+
     friend class SpillSortSinkOperatorX;
 
     std::unique_ptr<RuntimeState> _runtime_state;
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 66752161af6..d137574602f 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -72,104 +72,105 @@ int 
SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const
     return std::max(2, static_cast<int32_t>(count));
 }
 
-Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {
+Status SpillSortLocalState::_execute_merge_sort_spill_streams(RuntimeState* 
state,
+                                                              TUniqueId 
query_id) {
     auto& parent = Base::_parent->template cast<Parent>();
-    VLOG_DEBUG << fmt::format("Query:{}, sort source:{}, task:{}, merge spill 
data",
-                              print_id(state->query_id()), _parent->node_id(), 
state->task_id());
-
-    auto query_id = state->query_id();
-
-    auto spill_func = [this, state, query_id, &parent] {
-        SCOPED_TIMER(_spill_merge_sort_timer);
-        Status status;
-        Defer defer {[&]() {
-            if (!status.ok() || state->is_cancelled()) {
-                if (!status.ok()) {
-                    LOG(WARNING) << fmt::format(
-                            "Query:{}, sort source:{}, task:{}, merge spill 
data error:{}",
-                            print_id(query_id), _parent->node_id(), 
state->task_id(), status);
-                }
-                for (auto& stream : _current_merging_streams) {
-                    
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-                }
-                _current_merging_streams.clear();
-            } else {
-                VLOG_DEBUG << fmt::format(
-                        "Query:{}, sort source:{}, task:{}, merge spill data 
finish",
-                        print_id(query_id), _parent->node_id(), 
state->task_id());
+    SCOPED_TIMER(_spill_merge_sort_timer);
+    Status status;
+    Defer defer {[&]() {
+        if (!status.ok() || state->is_cancelled()) {
+            if (!status.ok()) {
+                LOG(WARNING) << fmt::format(
+                        "Query:{}, sort source:{}, task:{}, merge spill data 
error:{}",
+                        print_id(query_id), _parent->node_id(), 
state->task_id(), status);
             }
-        }};
-        vectorized::Block merge_sorted_block;
-        vectorized::SpillStreamSPtr tmp_stream;
-        while (!state->is_cancelled()) {
-            int max_stream_count = _calc_spill_blocks_to_merge(state);
-            VLOG_DEBUG << fmt::format(
-                    "Query:{}, sort source:{}, task:{}, merge spill streams, 
streams count:{}, "
-                    "curren merge max stream count:{}",
-                    print_id(query_id), _parent->node_id(), state->task_id(),
-                    _shared_state->sorted_streams.size(), max_stream_count);
-            {
-                SCOPED_TIMER(Base::_spill_recover_time);
-                status = _create_intermediate_merger(
-                        max_stream_count,
-                        
parent._sort_source_operator->get_sort_description(_runtime_state.get()));
+            for (auto& stream : _current_merging_streams) {
+                
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
             }
-            RETURN_IF_ERROR(status);
+            _current_merging_streams.clear();
+        } else {
+            VLOG_DEBUG << fmt::format("Query:{}, sort source:{}, task:{}, 
merge spill data finish",
+                                      print_id(query_id), _parent->node_id(), 
state->task_id());
+        }
+    }};
+    vectorized::Block merge_sorted_block;
+    vectorized::SpillStreamSPtr tmp_stream;
+    while (!state->is_cancelled()) {
+        int max_stream_count = _calc_spill_blocks_to_merge(state);
+        VLOG_DEBUG << fmt::format(
+                "Query:{}, sort source:{}, task:{}, merge spill streams, 
streams count:{}, "
+                "curren merge max stream count:{}",
+                print_id(query_id), _parent->node_id(), state->task_id(),
+                _shared_state->sorted_streams.size(), max_stream_count);
+        {
+            SCOPED_TIMER(Base::_spill_recover_time);
+            status = _create_intermediate_merger(
+                    max_stream_count,
+                    
parent._sort_source_operator->get_sort_description(_runtime_state.get()));
+        }
+        RETURN_IF_ERROR(status);
 
-            // all the remaining streams can be merged in a run
-            if (_shared_state->sorted_streams.empty()) {
-                return Status::OK();
-            }
+        // all the remaining streams can be merged in a run
+        if (_shared_state->sorted_streams.empty()) {
+            return Status::OK();
+        }
 
-            {
-                int32_t batch_size =
-                        _shared_state->spill_block_batch_row_count >
-                                        std::numeric_limits<int32_t>::max()
-                                ? std::numeric_limits<int32_t>::max()
-                                : 
static_cast<int32_t>(_shared_state->spill_block_batch_row_count);
-                status = 
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
-                        state, tmp_stream, print_id(state->query_id()), 
"sort", _parent->node_id(),
-                        batch_size, state->spill_sort_batch_bytes(), 
operator_profile());
-                RETURN_IF_ERROR(status);
+        {
+            int32_t batch_size =
+                    _shared_state->spill_block_batch_row_count > 
std::numeric_limits<int32_t>::max()
+                            ? std::numeric_limits<int32_t>::max()
+                            : 
static_cast<int32_t>(_shared_state->spill_block_batch_row_count);
+            status = 
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+                    state, tmp_stream, print_id(state->query_id()), "sort", 
_parent->node_id(),
+                    batch_size, state->spill_sort_batch_bytes(), 
operator_profile());
+            RETURN_IF_ERROR(status);
 
-                _shared_state->sorted_streams.emplace_back(tmp_stream);
-
-                bool eos = false;
-                while (!eos && !state->is_cancelled()) {
-                    merge_sorted_block.clear_column_data();
-                    {
-                        SCOPED_TIMER(Base::_spill_recover_time);
-                        
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
-                            status = Status::Error<INTERNAL_ERROR>(
-                                    "fault_inject spill_sort_source "
-                                    "recover_spill_data failed");
-                        });
-                        if (status.ok()) {
-                            status = _merger->get_next(&merge_sorted_block, 
&eos);
-                        }
-                    }
-                    RETURN_IF_ERROR(status);
-                    status = tmp_stream->spill_block(state, 
merge_sorted_block, eos);
+            _shared_state->sorted_streams.emplace_back(tmp_stream);
+
+            bool eos = false;
+            while (!eos && !state->is_cancelled()) {
+                merge_sorted_block.clear_column_data();
+                {
+                    SCOPED_TIMER(Base::_spill_recover_time);
+                    
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
+                        status = Status::Error<INTERNAL_ERROR>(
+                                "fault_inject spill_sort_source "
+                                "recover_spill_data failed");
+                    });
                     if (status.ok()) {
-                        
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
-                            status = Status::Error<INTERNAL_ERROR>(
-                                    "fault_inject spill_sort_source "
-                                    "spill_merged_data failed");
-                        });
+                        status = _merger->get_next(&merge_sorted_block, &eos);
                     }
-                    RETURN_IF_ERROR(status);
                 }
+                RETURN_IF_ERROR(status);
+                status = tmp_stream->spill_block(state, merge_sorted_block, 
eos);
+                if (status.ok()) {
+                    
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
+                        status = Status::Error<INTERNAL_ERROR>(
+                                "fault_inject spill_sort_source "
+                                "spill_merged_data failed");
+                    });
+                }
+                RETURN_IF_ERROR(status);
             }
-            for (auto& stream : _current_merging_streams) {
-                
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-            }
-            _current_merging_streams.clear();
         }
-        return Status::OK();
-    };
+        for (auto& stream : _current_merging_streams) {
+            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+        }
+        _current_merging_streams.clear();
+    }
+    return Status::OK();
+}
 
-    auto exception_catch_func = [spill_func]() {
-        auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
+Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {
+    VLOG_DEBUG << fmt::format("Query:{}, sort source:{}, task:{}, merge spill 
data",
+                              print_id(state->query_id()), _parent->node_id(), 
state->task_id());
+
+    auto query_id = state->query_id();
+    auto exception_catch_func = [this, state, query_id]() {
+        auto status = [&]() {
+            RETURN_IF_CATCH_EXCEPTION(
+                    { return _execute_merge_sort_spill_streams(state, 
query_id); });
+        }();
         return status;
     };
 
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h 
b/be/src/pipeline/exec/spill_sort_source_operator.h
index d37d5373ea1..f191d3c419b 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -53,6 +53,9 @@ protected:
     int _calc_spill_blocks_to_merge(RuntimeState* state) const;
     Status _create_intermediate_merger(int num_blocks,
                                        const vectorized::SortDescription& 
sort_description);
+
+    Status _execute_merge_sort_spill_streams(RuntimeState* state, TUniqueId 
query_id);
+
     friend class SpillSortSourceOperatorX;
     std::unique_ptr<RuntimeState> _runtime_state;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to