This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 74dbeb76e4a [improvement](spill) improve cancel (#34445)
74dbeb76e4a is described below
commit 74dbeb76e4a734cb9a05a043b92b26f275d7b470
Author: TengJianPing <[email protected]>
AuthorDate: Tue May 7 00:24:39 2024 +0800
[improvement](spill) improve cancel (#34445)
* [improvement](spill) improve cancel
* fix
---
.../exec/partitioned_aggregation_sink_operator.cpp | 26 ++-
.../exec/partitioned_aggregation_sink_operator.h | 5 -
.../partitioned_aggregation_source_operator.cpp | 20 +-
.../exec/partitioned_aggregation_source_operator.h | 5 -
.../exec/partitioned_hash_join_probe_operator.cpp | 214 +++++++++++++--------
.../exec/partitioned_hash_join_probe_operator.h | 5 -
.../exec/partitioned_hash_join_sink_operator.cpp | 68 +++++--
.../exec/partitioned_hash_join_sink_operator.h | 5 -
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 27 ++-
be/src/pipeline/exec/spill_sort_sink_operator.h | 5 -
.../pipeline/exec/spill_sort_source_operator.cpp | 19 +-
be/src/pipeline/exec/spill_sort_source_operator.h | 5 -
12 files changed, 245 insertions(+), 159 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 16faa90c9af..4d007ae9dc4 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -28,9 +28,9 @@ namespace doris::pipeline {
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase*
parent,
RuntimeState* state)
: Base(parent, state) {
- _finish_dependency = std::make_shared<FinishDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
- state->get_query_ctx());
+ _finish_dependency = std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
+ parent->get_name() +
"_SPILL_DEPENDENCY",
+ true,
state->get_query_ctx());
}
Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
doris::pipeline::LocalSinkStateInfo&
info) {
@@ -250,21 +250,31 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
}};
auto execution_context = state->get_task_execution_context();
- _shared_state_holder = _shared_state->shared_from_this();
+ /// Resources in shared state will be released when the operator is closed,
+ /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
+ /// So, we need hold the pointer of shared state.
+ std::weak_ptr<PartitionedAggSharedState> shared_state_holder =
+ _shared_state->shared_from_this();
auto query_id = state->query_id();
+ auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
status =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
- [this, &parent, state, query_id, execution_context, submit_timer] {
- auto execution_context_lock = execution_context.lock();
- if (!execution_context_lock) {
+ [this, &parent, state, query_id, mem_tracker, shared_state_holder,
execution_context,
+ submit_timer] {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe query was
cancelled.";
return Status::Cancelled("Cancelled");
}
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- SCOPED_ATTACH_TASK(state);
SCOPED_TIMER(Base::_spill_timer);
Defer defer {[&]() {
if (!_shared_state->sink_status.ok() ||
state->is_cancelled()) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 1b415629f22..5badc4916eb 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -261,11 +261,6 @@ public:
bool _eos = false;
std::shared_ptr<Dependency> _finish_dependency;
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::shared_ptr<PartitionedAggSharedState> _shared_state_holder;
-
// temp structures during spilling
vectorized::MutableColumns key_columns_;
vectorized::MutableColumns value_columns_;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index ff4795f2079..43c805b9557 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -204,17 +204,28 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
_dependency->Dependency::block();
auto execution_context = state->get_task_execution_context();
- _shared_state_holder = _shared_state->shared_from_this();
+ /// Resources in shared state will be released when the operator is closed,
+ /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
+ /// So, we need hold the pointer of shared state.
+ std::weak_ptr<PartitionedAggSharedState> shared_state_holder =
+ _shared_state->shared_from_this();
auto query_id = state->query_id();
+ auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
RETURN_IF_ERROR(
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
- [this, state, query_id, execution_context, submit_timer] {
- auto execution_context_lock = execution_context.lock();
- if (!execution_context_lock) {
+ [this, state, query_id, mem_tracker, shared_state_holder,
execution_context,
+ submit_timer] {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext>
execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe
query was cancelled.";
// FIXME: return status is meaningless?
@@ -222,7 +233,6 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
}
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- SCOPED_ATTACH_TASK(state);
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
if (!_status.ok()) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index ad835948dd9..a847b7fcf88 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -59,11 +59,6 @@ protected:
bool _current_partition_eos = true;
bool _is_merging = false;
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::shared_ptr<PartitionedAggSharedState> _shared_state_holder;
-
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
RuntimeProfile::Counter* _get_results_timer = nullptr;
RuntimeProfile::Counter* _serialize_result_timer = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index d8935003de6..10928024992 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -155,7 +155,6 @@ Status
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState*
state,
uint32_t
partition_index) {
- _shared_state_holder = _shared_state->shared_from_this();
auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
auto& mutable_block = partitioned_build_blocks[partition_index];
if (!mutable_block ||
@@ -178,46 +177,58 @@ Status
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
auto execution_context = state->get_task_execution_context();
+ /// Resources in shared state will be released when the operator is closed,
+ /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
+ /// So, we need hold the pointer of shared state.
+ std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+ _shared_state->shared_from_this();
+ auto query_id = state->query_id();
+ auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
+
MonotonicStopWatch submit_timer;
submit_timer.start();
- return spill_io_pool->submit_func(
- [execution_context, state, &build_spilling_stream, &mutable_block,
submit_timer, this] {
- auto execution_context_lock = execution_context.lock();
- if (!execution_context_lock) {
- LOG(INFO) << "execution_context released, maybe query was
cancelled.";
- return;
- }
-
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- SCOPED_TIMER(_spill_build_timer);
- (void)state; // avoid ut compile error
- SCOPED_ATTACH_TASK(state);
- if (_spill_status_ok) {
- auto build_block = mutable_block->to_block();
- DCHECK_EQ(mutable_block->rows(), 0);
- auto st = build_spilling_stream->spill_block(state,
build_block, false);
- if (!st.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- _spill_status_ok = false;
- _spill_status = std::move(st);
- } else {
- COUNTER_UPDATE(_spill_build_rows, build_block.rows());
- COUNTER_UPDATE(_spill_build_blocks, 1);
- }
- }
- --_spilling_task_count;
+ return spill_io_pool->submit_func([query_id, mem_tracker,
shared_state_holder,
+ execution_context, state,
&build_spilling_stream,
+ &mutable_block, submit_timer, this] {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
+ LOG(INFO) << "query " << print_id(query_id)
+ << " execution_context released, maybe query was
cancelled.";
+ return;
+ }
+ _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ SCOPED_TIMER(_spill_build_timer);
+ if (_spill_status_ok) {
+ auto build_block = mutable_block->to_block();
+ DCHECK_EQ(mutable_block->rows(), 0);
+ auto st = build_spilling_stream->spill_block(state, build_block,
false);
+ if (!st.ok()) {
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ _spill_status_ok = false;
+ _spill_status = std::move(st);
+ } else {
+ COUNTER_UPDATE(_spill_build_rows, build_block.rows());
+ COUNTER_UPDATE(_spill_build_blocks, 1);
+ }
+ }
+ --_spilling_task_count;
- if (_spilling_task_count == 0) {
- LOG(INFO) << "hash probe " << _parent->id()
- << " revoke memory spill_build_block finish";
- std::unique_lock<std::mutex> lock(_spill_lock);
- _dependency->set_ready();
- }
- });
+ if (_spilling_task_count == 0) {
+ LOG(INFO) << "hash probe " << _parent->id()
+ << " revoke memory spill_build_block finish";
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ _dependency->set_ready();
+ }
+ });
}
Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState*
state,
uint32_t
partition_index) {
- _shared_state_holder = _shared_state->shared_from_this();
auto& spilling_stream = _probe_spilling_streams[partition_index];
if (!spilling_stream) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
@@ -242,45 +253,60 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
if (!blocks.empty()) {
auto execution_context = state->get_task_execution_context();
+ /// Resources in shared state will be released when the operator is
closed,
+ /// but there may be asynchronous spilling tasks at this time, which
can lead to conflicts.
+ /// So, we need hold the pointer of shared state.
+ std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+ _shared_state->shared_from_this();
+
+ auto query_id = state->query_id();
+ auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
+
MonotonicStopWatch submit_timer;
submit_timer.start();
- return spill_io_pool->submit_func(
- [execution_context, state, &blocks, spilling_stream,
submit_timer, this] {
- auto execution_context_lock = execution_context.lock();
- if (!execution_context_lock) {
- LOG(INFO) << "execution_context released, maybe query
was cancelled.";
- return;
- }
-
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- SCOPED_TIMER(_spill_probe_timer);
- SCOPED_ATTACH_TASK(state);
- COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
- while (!blocks.empty() && !state->is_cancelled()) {
- auto block = std::move(blocks.back());
- blocks.pop_back();
- if (_spill_status_ok) {
- auto st = spilling_stream->spill_block(state,
block, false);
- if (!st.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- _spill_status_ok = false;
- _spill_status = std::move(st);
- break;
- }
- COUNTER_UPDATE(_spill_probe_rows, block.rows());
- } else {
- break;
- }
+ return spill_io_pool->submit_func([query_id, mem_tracker,
shared_state_holder,
+ execution_context, state, &blocks,
spilling_stream,
+ submit_timer, this] {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
+ LOG(INFO) << "query " << print_id(query_id)
+ << " execution_context released, maybe query was
cancelled.";
+ return;
+ }
+ _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ SCOPED_TIMER(_spill_probe_timer);
+ COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
+ while (!blocks.empty() && !state->is_cancelled()) {
+ auto block = std::move(blocks.back());
+ blocks.pop_back();
+ if (_spill_status_ok) {
+ auto st = spilling_stream->spill_block(state, block,
false);
+ if (!st.ok()) {
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ _spill_status_ok = false;
+ _spill_status = std::move(st);
+ break;
}
+ COUNTER_UPDATE(_spill_probe_rows, block.rows());
+ } else {
+ break;
+ }
+ }
- --_spilling_task_count;
+ --_spilling_task_count;
- if (_spilling_task_count == 0) {
- LOG(INFO) << "hash probe " << _parent->id()
- << " revoke memory spill_probe_blocks
finish";
- std::unique_lock<std::mutex> lock(_spill_lock);
- _dependency->set_ready();
- }
- });
+ if (_spilling_task_count == 0) {
+ LOG(INFO) << "hash probe " << _parent->id()
+ << " revoke memory spill_probe_blocks finish";
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ _dependency->set_ready();
+ }
+ });
} else {
--_spilling_task_count;
if (_spilling_task_count == 0) {
@@ -313,7 +339,6 @@ Status
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState*
state,
uint32_t partition_index,
bool& has_data) {
- _shared_state_holder = _shared_state->shared_from_this();
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
has_data = false;
if (!spilled_stream) {
@@ -328,23 +353,35 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
}
auto execution_context = state->get_task_execution_context();
+ /// Resources in shared state will be released when the operator is closed,
+ /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
+ /// So, we need hold the pointer of shared state.
+ std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+ _shared_state->shared_from_this();
+
+ auto query_id = state->query_id();
+ auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
- auto read_func = [this, state, &spilled_stream, &mutable_block,
execution_context,
- submit_timer] {
- auto execution_context_lock = execution_context.lock();
- if (!execution_context_lock || state->is_cancelled()) {
- LOG(INFO) << "execution_context released, maybe query was
canceled.";
+ auto read_func = [this, query_id, mem_tracker, state, &spilled_stream,
&mutable_block,
+ shared_state_holder, execution_context, submit_timer] {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock ||
state->is_cancelled()) {
+ LOG(INFO) << "query " << print_id(query_id)
+ << " execution_context released, maybe query was
cancelled.";
return;
}
- SCOPED_ATTACH_TASK(state);
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_recovery_build_timer);
Defer defer([this] { --_spilling_task_count; });
- (void)state; // avoid ut compile error
DCHECK_EQ(_spill_status_ok.load(), true);
bool eos = false;
@@ -369,10 +406,10 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
break;
}
- DCHECK_EQ(mutable_block->columns(), block.columns());
if (mutable_block->empty()) {
*mutable_block = std::move(block);
} else {
+ DCHECK_EQ(mutable_block->columns(), block.columns());
st = mutable_block->merge(std::move(block));
if (!st.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
@@ -404,7 +441,6 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState*
state,
uint32_t partition_index,
bool& has_data) {
- _shared_state_holder = _shared_state->shared_from_this();
auto& spilled_stream = _probe_spilling_streams[partition_index];
has_data = false;
if (!spilled_stream) {
@@ -415,22 +451,32 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
/// TODO: maybe recovery more blocks each time.
auto execution_context = state->get_task_execution_context();
+ std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+ _shared_state->shared_from_this();
+
+ auto query_id = state->query_id();
+ auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
- auto read_func = [this, execution_context, state, &spilled_stream,
&blocks, submit_timer] {
- auto execution_context_lock = execution_context.lock();
- if (!execution_context_lock) {
- LOG(INFO) << "execution_context released, maybe query was
cancelled.";
+ auto read_func = [this, query_id, mem_tracker, shared_state_holder,
execution_context,
+ &spilled_stream, &blocks, submit_timer] {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
+ LOG(INFO) << "query " << print_id(query_id)
+ << " execution_context released, maybe query was
cancelled.";
return;
}
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_recovery_probe_timer);
Defer defer([this] { --_spilling_task_count; });
- (void)state; // avoid ut compile error
- SCOPED_ATTACH_TASK(state);
DCHECK_EQ(_spill_status_ok.load(), true);
vectorized::Block block;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index f338d205407..1b37a725150 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -81,11 +81,6 @@ private:
std::vector<std::unique_ptr<vectorized::MutableBlock>> _partitioned_blocks;
std::map<uint32_t, std::vector<vectorized::Block>> _probe_blocks;
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::shared_ptr<PartitionedHashJoinSharedState> _shared_state_holder;
-
std::vector<vectorized::SpillStreamSPtr> _probe_spilling_streams;
std::unique_ptr<PartitionerType> _partitioner;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 80f1915bad1..890e68e732f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -102,6 +102,7 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child_x->row_desc();
+ const auto num_slots = row_desc.num_slots();
std::vector<vectorized::Block> build_blocks;
auto inner_sink_state_ =
_shared_state->inner_runtime_state->get_sink_local_state();
if (inner_sink_state_) {
@@ -116,11 +117,18 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
}
auto execution_context = state->get_task_execution_context();
+ /// Resources in shared state will be released when the operator is closed,
+ /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
+ /// So, we need hold the pointer of shared state.
+ std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+ _shared_state->shared_from_this();
+
_dependency->block();
auto query_id = state->query_id();
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
- auto spill_func = [execution_context, build_blocks =
std::move(build_blocks), state, query_id,
- mem_tracker, this]() mutable {
+ auto spill_func = [shared_state_holder, execution_context,
+ build_blocks = std::move(build_blocks), state,
query_id, mem_tracker,
+ num_slots, this]() mutable {
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
Defer defer {[&]() {
// need to reset build_block here, or else build_block will be
destructed
@@ -128,8 +136,12 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
build_blocks.clear();
}};
- auto execution_context_lock = execution_context.lock();
- if (!execution_context_lock || state->is_cancelled()) {
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock ||
state->is_cancelled()) {
LOG(INFO) << "execution_context released, maybe query was
canceled.";
return;
}
@@ -163,6 +175,11 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
if (UNLIKELY(build_block.empty())) {
continue;
}
+
+ if (build_block.columns() > num_slots) {
+ build_block.erase(num_slots);
+ }
+
{
SCOPED_TIMER(_partition_timer);
(void)_partitioner->do_partitioning(state, &build_block,
_mem_tracker.get());
@@ -213,13 +230,24 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
<< ", eos: " << _child_eos;
DCHECK_EQ(_spilling_streams_count, 0);
- _shared_state_holder = _shared_state->shared_from_this();
if (!_shared_state->need_to_spill) {
+ profile()->add_info_string("Spilled", "true");
_shared_state->need_to_spill = true;
return _revoke_unpartitioned_block(state);
}
_spilling_streams_count = _shared_state->partitioned_build_blocks.size();
+
+ auto execution_context = state->get_task_execution_context();
+ /// Resources in shared state will be released when the operator is closed,
+ /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
+ /// So, we need hold the pointer of shared state.
+ std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+ _shared_state->shared_from_this();
+
+ auto query_id = state->query_id();
+ auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
+
for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size();
++i) {
vectorized::SpillStreamSPtr& spilling_stream =
_shared_state->spilled_streams[i];
auto& mutable_block = _shared_state->partitioned_build_blocks[i];
@@ -235,24 +263,26 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
DCHECK(spill_io_pool != nullptr);
- auto execution_context = state->get_task_execution_context();
MonotonicStopWatch submit_timer;
submit_timer.start();
- auto st = spill_io_pool->submit_func(
- [this, execution_context, state, spilling_stream, i,
submit_timer] {
- auto execution_context_lock = execution_context.lock();
- if (!execution_context_lock) {
- LOG(INFO) << "execution_context released, maybe query
was cancelled.";
- return;
- }
-
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- SCOPED_TIMER(_spill_build_timer);
- (void)state; // avoid ut compile error
- SCOPED_ATTACH_TASK(state);
- _spill_to_disk(i, spilling_stream);
- });
+ auto st = spill_io_pool->submit_func([this, query_id, mem_tracker,
shared_state_holder,
+ execution_context,
spilling_stream, i, submit_timer] {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
+ LOG(INFO) << "execution_context released, maybe query was
cancelled.";
+ return;
+ }
+ _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ SCOPED_TIMER(_spill_build_timer);
+ _spill_to_disk(i, spilling_stream);
+ });
if (!st.ok()) {
--_spilling_streams_count;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 46ab5eab619..e527d601fff 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -70,11 +70,6 @@ protected:
Status _spill_status;
std::mutex _spill_status_lock;
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::shared_ptr<PartitionedHashJoinSharedState> _shared_state_holder;
-
std::unique_ptr<PartitionerType> _partitioner;
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 2fdd78b40b3..c6a943c59b5 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -23,9 +23,9 @@
namespace doris::pipeline {
SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase*
parent, RuntimeState* state)
: Base(parent, state) {
- _finish_dependency = std::make_shared<FinishDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
- state->get_query_ctx());
+ _finish_dependency = std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
+ parent->get_name() +
"_SPILL_DEPENDENCY",
+ true,
state->get_query_ctx());
}
Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
@@ -226,23 +226,34 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
}
auto execution_context = state->get_task_execution_context();
- _shared_state_holder = _shared_state->shared_from_this();
+
+ /// Resources in shared state will be released when the operator is closed,
+ /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
+ /// So, we need hold the pointer of shared state.
+ std::weak_ptr<SpillSortSharedState> shared_state_holder =
_shared_state->shared_from_this();
+
auto query_id = state->query_id();
+ auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
status =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
- [this, state, query_id, &parent, execution_context, submit_timer] {
- auto execution_context_lock = execution_context.lock();
- if (!execution_context_lock) {
+ [this, state, query_id, mem_tracker, shared_state_holder, &parent,
execution_context,
+ submit_timer] {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe query was
cancelled.";
return Status::OK();
}
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- SCOPED_ATTACH_TASK(state);
Defer defer {[&]() {
if (!_shared_state->sink_status.ok() ||
state->is_cancelled()) {
if (!_shared_state->sink_status.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index feba33bc96d..d6557454f9d 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -46,11 +46,6 @@ private:
friend class SpillSortSinkOperatorX;
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::shared_ptr<SpillSortSharedState> _shared_state_holder;
-
std::unique_ptr<RuntimeState> _runtime_state;
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
RuntimeProfile::Counter* _partial_sort_timer = nullptr;
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index a086fcc43e6..fe6b4ee3efc 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -88,15 +88,25 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
}};
auto execution_context = state->get_task_execution_context();
- _shared_state_holder = _shared_state->shared_from_this();
+ /// Resources in shared state will be released when the operator is closed,
+ /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
+ /// So, we need hold the pointer of shared state.
+ std::weak_ptr<SpillSortSharedState> shared_state_holder =
_shared_state->shared_from_this();
auto query_id = state->query_id();
+ auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
- auto spill_func = [this, state, query_id, &parent, execution_context,
submit_timer] {
- auto execution_context_lock = execution_context.lock();
- if (!execution_context_lock) {
+ auto spill_func = [this, state, query_id, mem_tracker, &parent,
shared_state_holder,
+ execution_context, submit_timer] {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe query was
cancelled.";
return Status::OK();
@@ -104,7 +114,6 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_spill_merge_sort_timer);
- SCOPED_ATTACH_TASK(state);
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
if (!_status.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h
b/be/src/pipeline/exec/spill_sort_source_operator.h
index 1afe0597ad5..ffe8e8a6898 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -56,11 +56,6 @@ protected:
bool _opened = false;
Status _status;
- /// Resources in shared state will be released when the operator is closed,
- /// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::shared_ptr<SpillSortSharedState> _shared_state_holder;
-
int64_t _external_sort_bytes_threshold = 134217728; // 128M
std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]