This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3fc1c03dd93 branch-4.0: [chore](spill) refactor lambda to function
#59584 (#59592)
3fc1c03dd93 is described below
commit 3fc1c03dd93bbd3a50bdda3a8b031286f35e733d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 7 16:54:47 2026 +0800
branch-4.0: [chore](spill) refactor lambda to function #59584 (#59592)
Cherry-picked from #59584
Co-authored-by: TengJianPing <[email protected]>
---
.../exec/partitioned_aggregation_sink_operator.cpp | 112 ++++----
.../exec/partitioned_aggregation_sink_operator.h | 2 +
.../exec/partitioned_hash_join_sink_operator.cpp | 294 +++++++++++----------
.../exec/partitioned_hash_join_sink_operator.h | 7 +
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 126 +++++----
be/src/pipeline/exec/spill_sort_sink_operator.h | 2 +
.../pipeline/exec/spill_sort_source_operator.cpp | 173 ++++++------
be/src/pipeline/exec/spill_sort_source_operator.h | 3 +
8 files changed, 368 insertions(+), 351 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 2594221e153..391f30d82d1 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -405,6 +405,57 @@ Status
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
return Status::OK();
}
+Status PartitionedAggSinkLocalState::_execute_spill_process(RuntimeState*
state,
+ size_t
size_to_revoke) {
+ Status status;
+ auto& parent = Base::_parent->template cast<Parent>();
+ auto query_id = state->query_id();
+
+
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
+ status = Status::InternalError("fault_inject partitioned_agg_sink
revoke_memory canceled");
+ state->get_query_ctx()->cancel(status);
+ return status;
+ });
+
+ Defer defer {[&]() {
+ if (!status.ok() || state->is_cancelled()) {
+ if (!status.ok()) {
+ LOG(WARNING) << fmt::format(
+ "Query:{}, agg sink:{}, task:{}, revoke_memory
error:{}",
+ print_id(query_id), Base::_parent->node_id(),
state->task_id(), status);
+ }
+ _shared_state->close();
+ } else {
+ LOG(INFO) << fmt::format(
+ "Query:{}, agg sink:{}, task:{}, revoke_memory finish,
eos:{}, revocable "
+ "memory:{}",
+ print_id(state->query_id()), _parent->node_id(),
state->task_id(), _eos,
+
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
+ }
+
+ if (_eos) {
+ Base::_dependency->set_ready_to_read();
+ }
+
state->get_query_ctx()->resource_ctx()->task_controller()->decrease_revoking_tasks_count();
+ }};
+
+ auto* runtime_state = _runtime_state.get();
+ auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state);
+ status = std::visit(
+ vectorized::Overload {[&](std::monostate& arg) -> Status {
+ return Status::InternalError("Unit hash
table");
+ },
+ [&](auto& agg_method) -> Status {
+ auto& hash_table =
*agg_method.hash_table;
+ RETURN_IF_CATCH_EXCEPTION(return
_spill_hash_table(
+ state, agg_method, hash_table,
size_to_revoke, _eos));
+ }},
+ agg_data->method_variant);
+ RETURN_IF_ERROR(status);
+ status = parent._agg_sink_operator->reset_hash_table(runtime_state);
+ return status;
+}
+
Status PartitionedAggSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
const auto size_to_revoke = _parent->revocable_mem_size(state);
@@ -423,9 +474,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(
update_profile<true>(sink_local_state->custom_profile());
}
- auto& parent = Base::_parent->template cast<Parent>();
- auto query_id = state->query_id();
-
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_submit_func",
{
return Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_agg_sink revoke_memory submit_func
failed");
@@ -433,60 +481,10 @@ Status PartitionedAggSinkLocalState::revoke_memory(
state->get_query_ctx()->resource_ctx()->task_controller()->increase_revoking_tasks_count();
- SpillSinkRunnable spill_runnable(
- state, spill_context, operator_profile(),
- [this, &parent, state, query_id, size_to_revoke] {
- Status status;
-
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
- status = Status::InternalError(
- "fault_inject partitioned_agg_sink "
- "revoke_memory canceled");
- state->get_query_ctx()->cancel(status);
- return status;
- });
- Defer defer {[&]() {
- if (!status.ok() || state->is_cancelled()) {
- if (!status.ok()) {
- LOG(WARNING) << fmt::format(
- "Query:{}, agg sink:{}, task:{},
revoke_memory error:{}",
- print_id(query_id),
Base::_parent->node_id(), state->task_id(),
- status);
- }
- _shared_state->close();
- } else {
- LOG(INFO) << fmt::format(
- "Query:{}, agg sink:{}, task:{}, revoke_memory
finish, eos:{}, "
- "revocable memory:{}",
- print_id(state->query_id()),
_parent->node_id(), state->task_id(),
- _eos,
-
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
- }
-
- if (_eos) {
- Base::_dependency->set_ready_to_read();
- }
- state->get_query_ctx()
- ->resource_ctx()
- ->task_controller()
- ->decrease_revoking_tasks_count();
- }};
- auto* runtime_state = _runtime_state.get();
- auto* agg_data =
parent._agg_sink_operator->get_agg_data(runtime_state);
- status = std::visit(
- vectorized::Overload {
- [&](std::monostate& arg) -> Status {
- return Status::InternalError("Unit hash
table");
- },
- [&](auto& agg_method) -> Status {
- auto& hash_table = *agg_method.hash_table;
- RETURN_IF_CATCH_EXCEPTION(return
_spill_hash_table(
- state, agg_method, hash_table,
size_to_revoke, _eos));
- }},
- agg_data->method_variant);
- RETURN_IF_ERROR(status);
- status =
parent._agg_sink_operator->reset_hash_table(runtime_state);
- return status;
- });
+ SpillSinkRunnable spill_runnable(state, spill_context, operator_profile(),
+ [this, state, size_to_revoke] {
+ return _execute_spill_process(state,
size_to_revoke);
+ });
return spill_runnable.run();
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 6f17533a97d..4af8c110ac7 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -49,6 +49,8 @@ public:
Status revoke_memory(RuntimeState* state, const
std::shared_ptr<SpillContext>& spill_context);
+ Status _execute_spill_process(RuntimeState* state, size_t size_to_revoke);
+
Status setup_in_memory_agg_op(RuntimeState* state);
template <bool spilled>
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index ae3ef2c4d57..fb05e6bc8b3 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -166,6 +166,93 @@ Dependency*
PartitionedHashJoinSinkLocalState::finishdependency() {
return _finish_dependency.get();
}
+Status PartitionedHashJoinSinkLocalState::_execute_spill_unpartitioned_block(
+ RuntimeState* state, vectorized::Block&& build_block) {
+ Defer defer1 {[&]() { update_memory_usage(); }};
+ auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+ auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+ std::vector<std::vector<uint32_t>> partitions_indexes(p._partition_count);
+
+ const size_t reserved_size = 4096;
+ std::ranges::for_each(partitions_indexes,
+ [](std::vector<uint32_t>& indices) {
indices.reserve(reserved_size); });
+
+ size_t total_rows = build_block.rows();
+ size_t offset = 1;
+ while (offset < total_rows) {
+ auto sub_block = build_block.clone_empty();
+ size_t this_run = std::min(reserved_size, total_rows - offset);
+
+ for (size_t i = 0; i != build_block.columns(); ++i) {
+ sub_block.get_by_position(i).column =
+ build_block.get_by_position(i).column->cut(offset,
this_run);
+ }
+ int64_t sub_blocks_memory_usage = sub_block.allocated_bytes();
+ COUNTER_UPDATE(_memory_used_counter, sub_blocks_memory_usage);
+ Defer defer2 {[&]() { COUNTER_UPDATE(_memory_used_counter,
-sub_blocks_memory_usage); }};
+
+ offset += this_run;
+ const auto is_last_block = offset == total_rows;
+
+ {
+ SCOPED_TIMER(_partition_timer);
+ (void)_partitioner->do_partitioning(state, &sub_block);
+ }
+
+ const auto* channel_ids =
_partitioner->get_channel_ids().get<uint32_t>();
+ for (size_t i = 0; i != sub_block.rows(); ++i) {
+ partitions_indexes[channel_ids[i]].emplace_back(i);
+ }
+
+ for (uint32_t partition_idx = 0; partition_idx != p._partition_count;
++partition_idx) {
+ auto* begin = partitions_indexes[partition_idx].data();
+ auto* end = begin + partitions_indexes[partition_idx].size();
+ auto& partition_block = partitioned_blocks[partition_idx];
+ vectorized::SpillStreamSPtr& spilling_stream =
+ _shared_state->spilled_streams[partition_idx];
+ if (UNLIKELY(!partition_block)) {
+ partition_block =
+
vectorized::MutableBlock::create_unique(build_block.clone_empty());
+ }
+
+ int64_t old_mem = partition_block->allocated_bytes();
+ {
+ SCOPED_TIMER(_partition_shuffle_timer);
+ RETURN_IF_ERROR(partition_block->add_rows(&sub_block, begin,
end));
+ partitions_indexes[partition_idx].clear();
+ }
+ int64_t new_mem = partition_block->allocated_bytes();
+
+ if (partition_block->rows() >= reserved_size || is_last_block) {
+ auto block = partition_block->to_block();
+ RETURN_IF_ERROR(spilling_stream->spill_block(state, block,
false));
+ partition_block =
+
vectorized::MutableBlock::create_unique(build_block.clone_empty());
+ COUNTER_UPDATE(_memory_used_counter, -new_mem);
+ } else {
+ COUNTER_UPDATE(_memory_used_counter, new_mem - old_mem);
+ }
+ }
+ }
+
+ Status status;
+ if (_child_eos) {
+ std::ranges::for_each(_shared_state->partitioned_build_blocks,
[&](auto& block) {
+ if (block) {
+ COUNTER_UPDATE(_in_mem_rows_counter, block->rows());
+ }
+ });
+ status = _finish_spilling();
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, hash join sink:{}, task:{},
_revoke_unpartitioned_block, "
+ "set_ready_to_read",
+ print_id(state->query_id()), _parent->node_id(),
state->task_id());
+ _dependency->set_ready_to_read();
+ }
+
+ return status;
+}
+
Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
@@ -210,96 +297,11 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
COUNTER_UPDATE(_memory_used_counter, build_block.allocated_bytes() -
block_old_mem);
}
- auto spill_func = [build_block = std::move(build_block), state, this]()
mutable {
- Defer defer1 {[&]() { update_memory_usage(); }};
- auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
- auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
- std::vector<std::vector<uint32_t>>
partitions_indexes(p._partition_count);
-
- const size_t reserved_size = 4096;
- std::ranges::for_each(partitions_indexes, [](std::vector<uint32_t>&
indices) {
- indices.reserve(reserved_size);
- });
-
- size_t total_rows = build_block.rows();
- size_t offset = 1;
- while (offset < total_rows) {
- auto sub_block = build_block.clone_empty();
- size_t this_run = std::min(reserved_size, total_rows - offset);
-
- for (size_t i = 0; i != build_block.columns(); ++i) {
- sub_block.get_by_position(i).column =
- build_block.get_by_position(i).column->cut(offset,
this_run);
- }
- int64_t sub_blocks_memory_usage = sub_block.allocated_bytes();
- COUNTER_UPDATE(_memory_used_counter, sub_blocks_memory_usage);
- Defer defer2 {
- [&]() { COUNTER_UPDATE(_memory_used_counter,
-sub_blocks_memory_usage); }};
-
- offset += this_run;
- const auto is_last_block = offset == total_rows;
-
- {
- SCOPED_TIMER(_partition_timer);
- (void)_partitioner->do_partitioning(state, &sub_block);
- }
-
- const auto* channel_ids =
_partitioner->get_channel_ids().get<uint32_t>();
- for (size_t i = 0; i != sub_block.rows(); ++i) {
- partitions_indexes[channel_ids[i]].emplace_back(i);
- }
-
- for (uint32_t partition_idx = 0; partition_idx !=
p._partition_count; ++partition_idx) {
- auto* begin = partitions_indexes[partition_idx].data();
- auto* end = begin + partitions_indexes[partition_idx].size();
- auto& partition_block = partitioned_blocks[partition_idx];
- vectorized::SpillStreamSPtr& spilling_stream =
- _shared_state->spilled_streams[partition_idx];
- if (UNLIKELY(!partition_block)) {
- partition_block =
-
vectorized::MutableBlock::create_unique(build_block.clone_empty());
- }
-
- int64_t old_mem = partition_block->allocated_bytes();
- {
- SCOPED_TIMER(_partition_shuffle_timer);
- RETURN_IF_ERROR(partition_block->add_rows(&sub_block,
begin, end));
- partitions_indexes[partition_idx].clear();
- }
- int64_t new_mem = partition_block->allocated_bytes();
-
- if (partition_block->rows() >= reserved_size || is_last_block)
{
- auto block = partition_block->to_block();
- RETURN_IF_ERROR(spilling_stream->spill_block(state, block,
false));
- partition_block =
-
vectorized::MutableBlock::create_unique(build_block.clone_empty());
- COUNTER_UPDATE(_memory_used_counter, -new_mem);
- } else {
- COUNTER_UPDATE(_memory_used_counter, new_mem - old_mem);
- }
- }
- }
-
- Status status;
- if (_child_eos) {
- std::ranges::for_each(_shared_state->partitioned_build_blocks,
[&](auto& block) {
- if (block) {
- COUNTER_UPDATE(_in_mem_rows_counter, block->rows());
- }
- });
- status = _finish_spilling();
- VLOG_DEBUG << fmt::format(
- "Query:{}, hash join sink:{}, task:{},
_revoke_unpartitioned_block, "
- "set_ready_to_read",
- print_id(state->query_id()), _parent->node_id(),
state->task_id());
- _dependency->set_ready_to_read();
- }
-
- return status;
- };
-
- auto exception_catch_func = [spill_func]() mutable {
- auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return spill_func());
}();
+ auto exception_catch_func = [this, state, build_block =
std::move(build_block)]() mutable {
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION(
+ return _execute_spill_unpartitioned_block(state,
std::move(build_block)));
+ }();
return status;
};
@@ -330,6 +332,60 @@ Status
PartitionedHashJoinSinkLocalState::terminate(RuntimeState* state) {
return
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>::terminate(state);
}
+Status PartitionedHashJoinSinkLocalState::_finish_spilling_callback(
+ RuntimeState* state, TUniqueId query_id,
+ const std::shared_ptr<SpillContext>& spill_context) {
+ Status status;
+ if (_child_eos) {
+ LOG(INFO) << fmt::format(
+ "Query:{}, hash join sink:{}, task:{}, finish spilling,
set_ready_to_read",
+ print_id(query_id), _parent->node_id(), state->task_id());
+ std::ranges::for_each(_shared_state->partitioned_build_blocks,
[&](auto& block) {
+ if (block) {
+ COUNTER_UPDATE(_in_mem_rows_counter, block->rows());
+ }
+ });
+ status = _finish_spilling();
+ _dependency->set_ready_to_read();
+ }
+
+ if (spill_context) {
+ spill_context->on_task_finished();
+ }
+
+ return status;
+}
+
+Status
PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(RuntimeState*
state,
+
TUniqueId query_id) {
+
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
{
+ auto status = Status::InternalError(
+ "fault_inject partitioned_hash_join_sink revoke_memory
canceled");
+ state->get_query_ctx()->cancel(status);
+ return status;
+ });
+ SCOPED_TIMER(_spill_build_timer);
+
+ for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size();
++i) {
+ vectorized::SpillStreamSPtr& spilling_stream =
_shared_state->spilled_streams[i];
+ DCHECK(spilling_stream != nullptr);
+ auto& mutable_block = _shared_state->partitioned_build_blocks[i];
+
+ if (!mutable_block ||
+ mutable_block->allocated_bytes() <
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+ continue;
+ }
+
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION(
+ return _spill_to_disk(static_cast<uint32_t>(i),
spilling_stream));
+ }();
+
+ RETURN_IF_ERROR(status);
+ }
+ return Status::OK();
+}
+
Status PartitionedHashJoinSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
SCOPED_TIMER(_spill_total_timer);
@@ -344,62 +400,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
}
const auto query_id = state->query_id();
- auto spill_fin_cb = [this, state, query_id, spill_context]() {
- Status status;
- if (_child_eos) {
- LOG(INFO) << fmt::format(
- "Query:{}, hash join sink:{}, task:{}, finish spilling,
set_ready_to_read",
- print_id(query_id), _parent->node_id(), state->task_id());
- std::ranges::for_each(_shared_state->partitioned_build_blocks,
[&](auto& block) {
- if (block) {
- COUNTER_UPDATE(_in_mem_rows_counter, block->rows());
- }
- });
- status = _finish_spilling();
- _dependency->set_ready_to_read();
- }
-
- if (spill_context) {
- spill_context->on_task_finished();
- }
-
- return status;
- };
-
SpillSinkRunnable spill_runnable(
state, nullptr, operator_profile(),
- [this, state, query_id] {
-
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
{
- auto status = Status::InternalError(
- "fault_inject partitioned_hash_join_sink "
- "revoke_memory canceled");
- state->get_query_ctx()->cancel(status);
- return status;
- });
- SCOPED_TIMER(_spill_build_timer);
-
- for (size_t i = 0; i !=
_shared_state->partitioned_build_blocks.size(); ++i) {
- vectorized::SpillStreamSPtr& spilling_stream =
- _shared_state->spilled_streams[i];
- DCHECK(spilling_stream != nullptr);
- auto& mutable_block =
_shared_state->partitioned_build_blocks[i];
-
- if (!mutable_block ||
- mutable_block->allocated_bytes() <
-
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- continue;
- }
-
- auto status = [&]() {
- RETURN_IF_CATCH_EXCEPTION(
- return
_spill_to_disk(static_cast<uint32_t>(i), spilling_stream));
- }();
-
- RETURN_IF_ERROR(status);
- }
- return Status::OK();
- },
- spill_fin_cb);
+ [this, state, query_id] { return
_execute_spill_partitioned_blocks(state, query_id); },
+ [this, state, query_id, spill_context]() {
+ return _finish_spilling_callback(state, query_id,
spill_context);
+ });
return spill_runnable.run();
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index a2fd8ea69ee..5d7cd5263db 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -71,8 +71,15 @@ protected:
Status _revoke_unpartitioned_block(RuntimeState* state,
const std::shared_ptr<SpillContext>&
spill_context);
+ Status _execute_spill_unpartitioned_block(RuntimeState* state,
vectorized::Block&& build_block);
+
Status _finish_spilling();
+ Status _finish_spilling_callback(RuntimeState* state, TUniqueId query_id,
+ const std::shared_ptr<SpillContext>&
spill_context);
+
+ Status _execute_spill_partitioned_blocks(RuntimeState* state, TUniqueId
query_id);
+
Status _setup_internal_operator(RuntimeState* state);
friend class PartitionedHashJoinSinkOperatorX;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index bac215e3f3c..1880f1d2e76 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -179,6 +179,62 @@ size_t
SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool e
eos);
}
+Status SpillSortSinkLocalState::_execute_spill_sort(RuntimeState* state,
TUniqueId query_id) {
+ auto& parent = Base::_parent->template cast<Parent>();
+ Status status;
+ Defer defer {[&]() {
+ if (!status.ok() || state->is_cancelled()) {
+ if (!status.ok()) {
+ LOG(WARNING) << fmt::format(
+ "Query:{}, sort sink:{}, task:{}, revoke memory
error:{}",
+ print_id(query_id), _parent->node_id(),
state->task_id(), status);
+ }
+ _shared_state->close();
+ } else {
+ VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{}, revoke
memory finish",
+ print_id(query_id), _parent->node_id(),
state->task_id());
+ }
+
+ if (!status.ok()) {
+ _shared_state->close();
+ }
+
+ _spilling_stream.reset();
+
state->get_query_ctx()->resource_ctx()->task_controller()->decrease_revoking_tasks_count();
+ if (_eos) {
+ _dependency->set_ready_to_read();
+ }
+ }};
+
+ status =
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
+ RETURN_IF_ERROR(status);
+
+ auto* sink_local_state = _runtime_state->get_sink_local_state();
+ update_profile(sink_local_state->custom_profile());
+
+ bool eos = false;
+ vectorized::Block block;
+
+ int32_t batch_size =
+ _shared_state->spill_block_batch_row_count >
std::numeric_limits<int32_t>::max()
+ ? std::numeric_limits<int32_t>::max()
+ :
static_cast<int32_t>(_shared_state->spill_block_batch_row_count);
+ while (!eos && !state->is_cancelled()) {
+ {
+ SCOPED_TIMER(_spill_merge_sort_timer);
+ status = parent._sort_sink_operator->merge_sort_read_for_spill(
+ _runtime_state.get(), &block, batch_size, &eos);
+ }
+ RETURN_IF_ERROR(status);
+ status = _spilling_stream->spill_block(state, block, eos);
+ RETURN_IF_ERROR(status);
+ block.clear_column_data();
+ }
+ parent._sort_sink_operator->reset(_runtime_state.get());
+
+ return Status::OK();
+}
+
Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
const
std::shared_ptr<SpillContext>& spill_context) {
auto& parent = Base::_parent->template cast<Parent>();
@@ -205,75 +261,17 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
_shared_state->sorted_streams.emplace_back(_spilling_stream);
auto query_id = state->query_id();
-
- auto spill_func = [this, state, query_id, &parent] {
- Status status;
- Defer defer {[&]() {
- if (!status.ok() || state->is_cancelled()) {
- if (!status.ok()) {
- LOG(WARNING) << fmt::format(
- "Query:{}, sort sink:{}, task:{}, revoke memory
error:{}",
- print_id(query_id), _parent->node_id(),
state->task_id(), status);
- }
- _shared_state->close();
- } else {
- VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{},
revoke memory finish",
- print_id(query_id),
_parent->node_id(), state->task_id());
- }
-
- if (!status.ok()) {
- _shared_state->close();
- }
-
- _spilling_stream.reset();
- state->get_query_ctx()
- ->resource_ctx()
- ->task_controller()
- ->decrease_revoking_tasks_count();
- if (_eos) {
- _dependency->set_ready_to_read();
- }
- }};
-
- status =
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
- RETURN_IF_ERROR(status);
-
- auto* sink_local_state = _runtime_state->get_sink_local_state();
- update_profile(sink_local_state->custom_profile());
-
- bool eos = false;
- vectorized::Block block;
-
- int32_t batch_size =
- _shared_state->spill_block_batch_row_count >
std::numeric_limits<int32_t>::max()
- ? std::numeric_limits<int32_t>::max()
- :
static_cast<int32_t>(_shared_state->spill_block_batch_row_count);
- while (!eos && !state->is_cancelled()) {
- {
- SCOPED_TIMER(_spill_merge_sort_timer);
- status = parent._sort_sink_operator->merge_sort_read_for_spill(
- _runtime_state.get(), &block, batch_size, &eos);
- }
- RETURN_IF_ERROR(status);
- status = _spilling_stream->spill_block(state, block, eos);
- RETURN_IF_ERROR(status);
- block.clear_column_data();
- }
- parent._sort_sink_operator->reset(_runtime_state.get());
-
- return Status::OK();
- };
-
- auto exception_catch_func = [query_id, state, spill_func]() {
+ auto exception_catch_func = [this, query_id, state]() {
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel",
{
- auto status = Status::InternalError(
- "fault_inject spill_sort_sink "
- "revoke_memory canceled");
+ auto status =
+ Status::InternalError("fault_inject spill_sort_sink
revoke_memory canceled");
state->get_query_ctx()->cancel(status);
return status;
});
- auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func();
}); }();
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION({ return _execute_spill_sort(state,
query_id); });
+ }();
return status;
};
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 49a57c0ddda..84c5be478c5 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -50,6 +50,8 @@ private:
void _init_counters();
void update_profile(RuntimeProfile* child_profile);
+ Status _execute_spill_sort(RuntimeState* state, TUniqueId query_id);
+
friend class SpillSortSinkOperatorX;
std::unique_ptr<RuntimeState> _runtime_state;
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 66752161af6..d137574602f 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -72,104 +72,105 @@ int
SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const
return std::max(2, static_cast<int32_t>(count));
}
-Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState*
state) {
+Status SpillSortLocalState::_execute_merge_sort_spill_streams(RuntimeState*
state,
+ TUniqueId
query_id) {
auto& parent = Base::_parent->template cast<Parent>();
- VLOG_DEBUG << fmt::format("Query:{}, sort source:{}, task:{}, merge spill
data",
- print_id(state->query_id()), _parent->node_id(),
state->task_id());
-
- auto query_id = state->query_id();
-
- auto spill_func = [this, state, query_id, &parent] {
- SCOPED_TIMER(_spill_merge_sort_timer);
- Status status;
- Defer defer {[&]() {
- if (!status.ok() || state->is_cancelled()) {
- if (!status.ok()) {
- LOG(WARNING) << fmt::format(
- "Query:{}, sort source:{}, task:{}, merge spill
data error:{}",
- print_id(query_id), _parent->node_id(),
state->task_id(), status);
- }
- for (auto& stream : _current_merging_streams) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
- _current_merging_streams.clear();
- } else {
- VLOG_DEBUG << fmt::format(
- "Query:{}, sort source:{}, task:{}, merge spill data
finish",
- print_id(query_id), _parent->node_id(),
state->task_id());
+ SCOPED_TIMER(_spill_merge_sort_timer);
+ Status status;
+ Defer defer {[&]() {
+ if (!status.ok() || state->is_cancelled()) {
+ if (!status.ok()) {
+ LOG(WARNING) << fmt::format(
+ "Query:{}, sort source:{}, task:{}, merge spill data
error:{}",
+ print_id(query_id), _parent->node_id(),
state->task_id(), status);
}
- }};
- vectorized::Block merge_sorted_block;
- vectorized::SpillStreamSPtr tmp_stream;
- while (!state->is_cancelled()) {
- int max_stream_count = _calc_spill_blocks_to_merge(state);
- VLOG_DEBUG << fmt::format(
- "Query:{}, sort source:{}, task:{}, merge spill streams,
streams count:{}, "
- "curren merge max stream count:{}",
- print_id(query_id), _parent->node_id(), state->task_id(),
- _shared_state->sorted_streams.size(), max_stream_count);
- {
- SCOPED_TIMER(Base::_spill_recover_time);
- status = _create_intermediate_merger(
- max_stream_count,
-
parent._sort_source_operator->get_sort_description(_runtime_state.get()));
+ for (auto& stream : _current_merging_streams) {
+
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
}
- RETURN_IF_ERROR(status);
+ _current_merging_streams.clear();
+ } else {
+ VLOG_DEBUG << fmt::format("Query:{}, sort source:{}, task:{},
merge spill data finish",
+ print_id(query_id), _parent->node_id(),
state->task_id());
+ }
+ }};
+ vectorized::Block merge_sorted_block;
+ vectorized::SpillStreamSPtr tmp_stream;
+ while (!state->is_cancelled()) {
+ int max_stream_count = _calc_spill_blocks_to_merge(state);
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, sort source:{}, task:{}, merge spill streams,
streams count:{}, "
+ "curren merge max stream count:{}",
+ print_id(query_id), _parent->node_id(), state->task_id(),
+ _shared_state->sorted_streams.size(), max_stream_count);
+ {
+ SCOPED_TIMER(Base::_spill_recover_time);
+ status = _create_intermediate_merger(
+ max_stream_count,
+
parent._sort_source_operator->get_sort_description(_runtime_state.get()));
+ }
+ RETURN_IF_ERROR(status);
- // all the remaining streams can be merged in a run
- if (_shared_state->sorted_streams.empty()) {
- return Status::OK();
- }
+ // all the remaining streams can be merged in a run
+ if (_shared_state->sorted_streams.empty()) {
+ return Status::OK();
+ }
- {
- int32_t batch_size =
- _shared_state->spill_block_batch_row_count >
- std::numeric_limits<int32_t>::max()
- ? std::numeric_limits<int32_t>::max()
- :
static_cast<int32_t>(_shared_state->spill_block_batch_row_count);
- status =
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
- state, tmp_stream, print_id(state->query_id()),
"sort", _parent->node_id(),
- batch_size, state->spill_sort_batch_bytes(),
operator_profile());
- RETURN_IF_ERROR(status);
+ {
+ int32_t batch_size =
+ _shared_state->spill_block_batch_row_count >
std::numeric_limits<int32_t>::max()
+ ? std::numeric_limits<int32_t>::max()
+ :
static_cast<int32_t>(_shared_state->spill_block_batch_row_count);
+ status =
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+ state, tmp_stream, print_id(state->query_id()), "sort",
_parent->node_id(),
+ batch_size, state->spill_sort_batch_bytes(),
operator_profile());
+ RETURN_IF_ERROR(status);
- _shared_state->sorted_streams.emplace_back(tmp_stream);
-
- bool eos = false;
- while (!eos && !state->is_cancelled()) {
- merge_sorted_block.clear_column_data();
- {
- SCOPED_TIMER(Base::_spill_recover_time);
-
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
- status = Status::Error<INTERNAL_ERROR>(
- "fault_inject spill_sort_source "
- "recover_spill_data failed");
- });
- if (status.ok()) {
- status = _merger->get_next(&merge_sorted_block,
&eos);
- }
- }
- RETURN_IF_ERROR(status);
- status = tmp_stream->spill_block(state,
merge_sorted_block, eos);
+ _shared_state->sorted_streams.emplace_back(tmp_stream);
+
+ bool eos = false;
+ while (!eos && !state->is_cancelled()) {
+ merge_sorted_block.clear_column_data();
+ {
+ SCOPED_TIMER(Base::_spill_recover_time);
+
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
+ status = Status::Error<INTERNAL_ERROR>(
+ "fault_inject spill_sort_source "
+ "recover_spill_data failed");
+ });
if (status.ok()) {
-
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
- status = Status::Error<INTERNAL_ERROR>(
- "fault_inject spill_sort_source "
- "spill_merged_data failed");
- });
+ status = _merger->get_next(&merge_sorted_block, &eos);
}
- RETURN_IF_ERROR(status);
}
+ RETURN_IF_ERROR(status);
+ status = tmp_stream->spill_block(state, merge_sorted_block,
eos);
+ if (status.ok()) {
+
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
+ status = Status::Error<INTERNAL_ERROR>(
+ "fault_inject spill_sort_source "
+ "spill_merged_data failed");
+ });
+ }
+ RETURN_IF_ERROR(status);
}
- for (auto& stream : _current_merging_streams) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
- _current_merging_streams.clear();
}
- return Status::OK();
- };
+ for (auto& stream : _current_merging_streams) {
+
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+ }
+ _current_merging_streams.clear();
+ }
+ return Status::OK();
+}
- auto exception_catch_func = [spill_func]() {
- auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func();
}); }();
+Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState*
state) {
+ VLOG_DEBUG << fmt::format("Query:{}, sort source:{}, task:{}, merge spill
data",
+ print_id(state->query_id()), _parent->node_id(),
state->task_id());
+
+ auto query_id = state->query_id();
+ auto exception_catch_func = [this, state, query_id]() {
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION(
+ { return _execute_merge_sort_spill_streams(state,
query_id); });
+ }();
return status;
};
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h
b/be/src/pipeline/exec/spill_sort_source_operator.h
index d37d5373ea1..f191d3c419b 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -53,6 +53,9 @@ protected:
int _calc_spill_blocks_to_merge(RuntimeState* state) const;
Status _create_intermediate_merger(int num_blocks,
const vectorized::SortDescription&
sort_description);
+
+ Status _execute_merge_sort_spill_streams(RuntimeState* state, TUniqueId
query_id);
+
friend class SpillSortSourceOperatorX;
std::unique_ptr<RuntimeState> _runtime_state;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]