This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8fb501b09ef [refactor](spill) unify the entry point of spill tasks
(#37020)
8fb501b09ef is described below
commit 8fb501b09efd77a299ab3405ff25f7a652f14b7a
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Jul 1 09:52:32 2024 +0800
[refactor](spill) unify the entry point of spill tasks (#37020)
---
.../exec/partitioned_aggregation_sink_operator.cpp | 30 ++----
.../partitioned_aggregation_source_operator.cpp | 29 ++----
.../exec/partitioned_aggregation_source_operator.h | 1 -
.../exec/partitioned_hash_join_probe_operator.cpp | 76 +++-----------
.../exec/partitioned_hash_join_probe_operator.h | 4 +-
.../exec/partitioned_hash_join_sink_operator.cpp | 112 +++++++--------------
.../exec/partitioned_hash_join_sink_operator.h | 3 +-
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 30 +-----
.../pipeline/exec/spill_sort_source_operator.cpp | 27 +----
be/src/pipeline/exec/spill_sort_source_operator.h | 1 -
be/src/pipeline/exec/spill_utils.h | 76 ++++++++++++++
11 files changed, 152 insertions(+), 237 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index b833289e0e0..4399f3c7045 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -22,6 +22,7 @@
#include "aggregation_sink_operator.h"
#include "common/status.h"
+#include "pipeline/exec/spill_utils.h"
#include "runtime/fragment_mgr.h"
#include "vec/spill/spill_stream_manager.h"
@@ -253,14 +254,7 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
}
}};
- auto execution_context = state->get_task_execution_context();
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::weak_ptr<PartitionedAggSharedState> shared_state_holder =
- _shared_state->shared_from_this();
auto query_id = state->query_id();
- auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
@@ -269,20 +263,10 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
"fault_inject partitioned_agg_sink revoke_memory submit_func
failed");
return status;
});
- status =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
- [this, &parent, state, query_id, mem_tracker, shared_state_holder,
execution_context,
- submit_timer] {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return Status::Cancelled("Cancelled");
- }
+
+ auto spill_runnable = std::make_shared<SpillRunnable>(
+ state, _shared_state->shared_from_this(),
+ [this, &parent, state, query_id, submit_timer] {
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
auto st = Status::InternalError(
"fault_inject partitioned_agg_sink "
@@ -332,7 +316,9 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
parent._agg_sink_operator->reset_hash_table(runtime_state);
return Base::_shared_state->sink_status;
});
- return status;
+
+ return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
+ std::move(spill_runnable));
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index fd609d95eef..a8c4e7b0bcc 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -23,6 +23,7 @@
#include "common/exception.h"
#include "common/status.h"
#include "pipeline/exec/operator.h"
+#include "pipeline/exec/spill_utils.h"
#include "runtime/fragment_mgr.h"
#include "util/runtime_profile.h"
#include "vec/spill/spill_stream_manager.h"
@@ -204,18 +205,11 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table());
_dependency->Dependency::block();
- auto execution_context = state->get_task_execution_context();
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::weak_ptr<PartitionedAggSharedState> shared_state_holder =
- _shared_state->shared_from_this();
auto query_id = state->query_id();
- auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
- auto spill_func = [this, state, query_id, execution_context, submit_timer]
{
+ auto spill_func = [this, state, query_id, submit_timer] {
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
@@ -276,19 +270,7 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
return _status;
};
- auto exception_catch_func = [spill_func, query_id, mem_tracker,
shared_state_holder,
- execution_context, this]() {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return;
- }
+ auto exception_catch_func = [spill_func, query_id, this]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel",
{
auto st = Status::InternalError(
"fault_inject partitioned_agg_source "
@@ -308,7 +290,8 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
return Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_agg_source submit_func failed");
});
- return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
- exception_catch_func);
+ return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
+ std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
+ exception_catch_func));
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index a847b7fcf88..994290a15bb 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -94,7 +94,6 @@ public:
private:
friend class PartitionedAggLocalState;
- Status _initiate_merge_spill_partition_agg_data(RuntimeState* state);
std::unique_ptr<AggSourceOperatorX> _agg_source_operator;
};
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 261979e3261..1ff927bcc6d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -158,15 +158,7 @@ Status
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState*
state) {
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
- auto execution_context = state->get_task_execution_context();
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
- _shared_state->shared_from_this();
-
auto query_id = state->query_id();
- auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
@@ -216,19 +208,7 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
return Status::OK();
};
- auto exception_catch_func = [query_id, mem_tracker, shared_state_holder,
execution_context,
- spill_func, this]() {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query: " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return;
- }
+ auto exception_catch_func = [query_id, spill_func, this]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
{
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
query_id, Status::InternalError("fault_inject
partitioned_hash_join_probe "
@@ -250,7 +230,10 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
return Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_hash_join_probe spill_probe_blocks
submit_func failed");
});
- return spill_io_pool->submit_func(exception_catch_func);
+
+ auto spill_runnable = std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
+
exception_catch_func);
+ return spill_io_pool->submit(std::move(spill_runnable));
}
Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t
partition_index) {
@@ -288,15 +271,10 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
return Status::OK();
}
- auto execution_context = state->get_task_execution_context();
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
_shared_state->shared_from_this();
auto query_id = state->query_id();
- auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
@@ -362,19 +340,7 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
_dependency->set_ready();
};
- auto exception_catch_func = [read_func, query_id, mem_tracker,
shared_state_holder,
- execution_context, state, this]() {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock ||
state->is_cancelled()) {
- LOG(INFO) << "query: " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return;
- }
+ auto exception_catch_func = [read_func, query_id, this]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel",
{
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
query_id, Status::InternalError("fault_inject
partitioned_hash_join_probe "
@@ -403,7 +369,9 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
"fault_inject partitioned_hash_join_probe "
"recovery_build_blocks submit_func failed");
});
- return spill_io_pool->submit_func(exception_catch_func);
+ auto spill_runnable = std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
+
exception_catch_func);
+ return spill_io_pool->submit(std::move(spill_runnable));
}
std::string PartitionedHashJoinProbeLocalState::debug_string(int
indentation_level) const {
@@ -426,14 +394,7 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
auto& blocks = _probe_blocks[partition_index];
- /// TODO: maybe recovery more blocks each time.
- auto execution_context = state->get_task_execution_context();
- std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
- _shared_state->shared_from_this();
-
auto query_id = state->query_id();
- auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
-
MonotonicStopWatch submit_timer;
submit_timer.start();
@@ -470,19 +431,7 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
_dependency->set_ready();
};
- auto exception_catch_func = [read_func, mem_tracker, shared_state_holder,
execution_context,
- query_id, this]() {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query: " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return;
- }
+ auto exception_catch_func = [read_func, query_id, this]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel",
{
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
query_id, Status::InternalError("fault_inject
partitioned_hash_join_probe "
@@ -511,7 +460,8 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
"fault_inject partitioned_hash_join_probe "
"recovery_probe_blocks submit_func failed");
});
- return spill_io_pool->submit_func(exception_catch_func);
+ return spill_io_pool->submit(std::make_shared<SpillRunnable>(
+ state, _shared_state->shared_from_this(), exception_catch_func));
}
PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool*
pool,
@@ -538,7 +488,7 @@ Status PartitionedHashJoinProbeOperatorX::init(const
TPlanNode& tnode, RuntimeSt
for (auto& conjunct : tnode.hash_join_node.eq_join_conjuncts) {
_probe_exprs.emplace_back(conjunct.left);
}
- _partitioner = std::make_unique<PartitionerType>(_partition_count);
+ _partitioner = std::make_unique<SpillPartitionerType>(_partition_count);
RETURN_IF_ERROR(_partitioner->init(_probe_exprs));
return Status::OK();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 1fbed85a123..6ee718a3354 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -24,15 +24,13 @@
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/join_build_sink_operator.h"
-#include "vec/runtime/partitioner.h"
+#include "pipeline/exec/spill_utils.h"
namespace doris {
class RuntimeState;
namespace pipeline {
-using PartitionerType =
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
-
class PartitionedHashJoinProbeOperatorX;
class PartitionedHashJoinProbeLocalState final
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index cb104cfc7cd..4aa0bd42a84 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -118,16 +118,7 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
return Status::OK();
}
- auto execution_context = state->get_task_execution_context();
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
- _shared_state->shared_from_this();
- auto query_id = state->query_id();
- auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
- auto spill_func = [state, num_slots,
- this](std::vector<vectorized::Block>& build_blocks)
mutable {
+ auto spill_func = [build_blocks = std::move(build_blocks), state,
num_slots, this]() mutable {
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
std::vector<std::vector<uint32_t>>
partitions_indexes(p._partition_count);
@@ -211,28 +202,9 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
_dependency->set_ready();
};
- auto exception_catch_func = [build_blocks = std::move(build_blocks),
spill_func,
- shared_state_holder, execution_context,
state, query_id,
- mem_tracker, this]() mutable {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- Defer defer {[&]() {
- // need to reset build_block here, or else build_block will be
destructed
- // after SCOPED_ATTACH_TASK_WITH_ID and will trigger
memory_orphan_check failure
- build_blocks.clear();
- }};
-
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock ||
state->is_cancelled()) {
- LOG(INFO) << "execution_context released, maybe query was
canceled.";
- return;
- }
-
+ auto exception_catch_func = [spill_func, this]() mutable {
auto status = [&]() {
- RETURN_IF_CATCH_EXCEPTION(spill_func(build_blocks));
+ RETURN_IF_CATCH_EXCEPTION(spill_func());
return Status::OK();
}();
@@ -243,6 +215,10 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
_dependency->set_ready();
}
};
+
+ auto spill_runnable = std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
+
exception_catch_func);
+
auto* thread_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
_dependency->block();
@@ -252,7 +228,7 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
"fault_inject partitioned_hash_join_sink "
"revoke_unpartitioned_block submit_func failed");
});
- return thread_pool->submit_func(exception_catch_func);
+ return thread_pool->submit(std::move(spill_runnable));
}
Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
@@ -268,15 +244,7 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
_spilling_streams_count = _shared_state->partitioned_build_blocks.size();
- auto execution_context = state->get_task_execution_context();
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
- _shared_state->shared_from_this();
-
auto query_id = state->query_id();
- auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size();
++i) {
vectorized::SpillStreamSPtr& spilling_stream =
_shared_state->spilled_streams[i];
@@ -302,41 +270,35 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
st = Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_hash_join_sink revoke_memory
submit_func failed");
});
- if (st.ok()) {
- st = spill_io_pool->submit_func([this, query_id, mem_tracker,
shared_state_holder,
- execution_context,
spilling_stream, i, submit_timer] {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "execution_context released, maybe query was
cancelled.";
- return;
- }
-
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
{
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
- query_id,
- Status::InternalError("fault_inject
partitioned_hash_join_sink "
- "revoke_memory canceled"));
- return;
+
+ auto spill_runnable = std::make_shared<SpillRunnable>(
+ state, _shared_state->shared_from_this(),
+ [this, query_id, spilling_stream, i, submit_timer] {
+ DBUG_EXECUTE_IF(
+
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
+
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+ query_id, Status::InternalError(
+ "fault_inject
partitioned_hash_join_sink "
+ "revoke_memory
canceled"));
+ return;
+ });
+
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ SCOPED_TIMER(_spill_build_timer);
+
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i,
spilling_stream));
+ return Status::OK();
+ }();
+
+ if (!status.OK()) {
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ _dependency->set_ready();
+ _spill_status_ok = false;
+ _spill_status = std::move(status);
+ }
});
-
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- SCOPED_TIMER(_spill_build_timer);
-
- auto status = [&]() {
- RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i,
spilling_stream));
- return Status::OK();
- }();
-
- if (!status.OK()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- _dependency->set_ready();
- _spill_status_ok = false;
- _spill_status = std::move(status);
- }
- });
+ if (st.ok()) {
+ st = spill_io_pool->submit(std::move(spill_runnable));
}
if (!st.ok()) {
@@ -452,7 +414,7 @@ Status PartitionedHashJoinSinkOperatorX::init(const
TPlanNode& tnode, RuntimeSta
_build_exprs.emplace_back(eq_join_conjunct.right);
partition_exprs.emplace_back(eq_join_conjunct.right);
}
- _partitioner = std::make_unique<PartitionerType>(_partition_count);
+ _partitioner = std::make_unique<SpillPartitionerType>(_partition_count);
RETURN_IF_ERROR(_partitioner->init(_build_exprs));
return Status::OK();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 5cd3ff21187..1592c29cdb0 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -24,6 +24,7 @@
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/join_build_sink_operator.h"
+#include "pipeline/exec/spill_utils.h"
#include "vec/runtime/partitioner.h"
namespace doris {
@@ -31,8 +32,6 @@ class RuntimeState;
namespace pipeline {
-using PartitionerType =
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
-
class PartitionedHashJoinSinkOperatorX;
class PartitionedHashJoinSinkLocalState
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index d15b936b4c6..b7fae82ca54 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -18,6 +18,7 @@
#include "spill_sort_sink_operator.h"
#include "pipeline/exec/sort_sink_operator.h"
+#include "pipeline/exec/spill_utils.h"
#include "runtime/fragment_mgr.h"
#include "vec/spill/spill_stream_manager.h"
@@ -229,16 +230,7 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
if (!_eos) {
Base::_dependency->Dependency::block();
}
-
- auto execution_context = state->get_task_execution_context();
-
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::weak_ptr<SpillSortSharedState> shared_state_holder =
_shared_state->shared_from_this();
-
auto query_id = state->query_id();
- auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
@@ -300,19 +292,7 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
return Status::OK();
};
- auto exception_catch_func = [this, query_id, mem_tracker,
shared_state_holder,
- execution_context, spill_func]() {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return;
- }
+ auto exception_catch_func = [this, query_id, spill_func]() {
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel",
{
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
query_id, Status::InternalError("fault_inject
spill_sort_sink "
@@ -331,9 +311,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state) {
"revoke_memory submit_func failed");
});
if (status.ok()) {
- status =
-
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
- exception_catch_func);
+ status =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
+ std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
+ exception_catch_func));
}
if (!status.ok()) {
if (!_eos) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 7d62a18461b..b322f33caa2 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -18,6 +18,7 @@
#include "spill_sort_source_operator.h"
#include "common/status.h"
+#include "pipeline/exec/spill_utils.h"
#include "runtime/fragment_mgr.h"
#include "sort_source_operator.h"
#include "util/runtime_profile.h"
@@ -80,13 +81,7 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
<< " merge spill data";
_dependency->Dependency::block();
- auto execution_context = state->get_task_execution_context();
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::weak_ptr<SpillSortSharedState> shared_state_holder =
_shared_state->shared_from_this();
auto query_id = state->query_id();
- auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
@@ -180,20 +175,7 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
return Status::OK();
};
- auto exception_catch_func = [this, query_id, mem_tracker,
shared_state_holder,
- execution_context, spill_func]() {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return;
- }
-
+ auto exception_catch_func = [this, spill_func]() {
_status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); });
}();
};
@@ -202,8 +184,9 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
"fault_inject spill_sort_source "
"merge_sort_spill_data submit_func failed");
});
- return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
- exception_catch_func);
+ return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
+ std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
+ exception_catch_func));
}
Status SpillSortLocalState::_create_intermediate_merger(
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h
b/be/src/pipeline/exec/spill_sort_source_operator.h
index ffe8e8a6898..09367415d91 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -90,7 +90,6 @@ public:
private:
friend class SpillSortLocalState;
- Status _initiate_merge_spill_partition_agg_data(RuntimeState* state);
std::unique_ptr<SortSourceOperatorX> _sort_source_operator;
};
diff --git a/be/src/pipeline/exec/spill_utils.h
b/be/src/pipeline/exec/spill_utils.h
new file mode 100644
index 00000000000..f2f19512cbd
--- /dev/null
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/query_context.h"
+#include "runtime/runtime_state.h"
+#include "runtime/task_execution_context.h"
+#include "runtime/thread_context.h"
+#include "util/threadpool.h"
+#include "vec/runtime/partitioner.h"
+
+namespace doris::pipeline {
+using SpillPartitionerType =
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
+
+class SpillRunnable : public Runnable {
+public:
+ SpillRunnable(RuntimeState* state, const
std::shared_ptr<BasicSharedState>& shared_state,
+ std::function<void()> func)
+ : _state(state),
+ _mem_tracker(state->get_query_ctx()->query_mem_tracker),
+ _task_id(state->query_id()),
+ _task_context_holder(state->get_task_execution_context()),
+ _shared_state_holder(shared_state),
+ _func(std::move(func)) {}
+
+ ~SpillRunnable() override = default;
+
+ void run() override {
+ SCOPED_ATTACH_TASK_WITH_ID(_mem_tracker, _task_id);
+ Defer defer([&] {
+ std::function<void()> tmp;
+ std::swap(tmp, _func);
+ });
+
+ auto task_context_holder = _task_context_holder.lock();
+ if (!task_context_holder) {
+ return;
+ }
+
+ auto shared_state_holder = _shared_state_holder.lock();
+ if (!shared_state_holder) {
+ return;
+ }
+
+ if (_state->is_cancelled()) {
+ return;
+ }
+ _func();
+ }
+
+private:
+ RuntimeState* _state;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+ TUniqueId _task_id;
+ std::weak_ptr<TaskExecutionContext> _task_context_holder;
+ std::weak_ptr<BasicSharedState> _shared_state_holder;
+ std::function<void()> _func;
+};
+
+} // namespace doris::pipeline
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]