This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 74dbeb76e4a [improvement](spill) improve cancel (#34445)
74dbeb76e4a is described below

commit 74dbeb76e4a734cb9a05a043b92b26f275d7b470
Author: TengJianPing <[email protected]>
AuthorDate: Tue May 7 00:24:39 2024 +0800

    [improvement](spill) improve cancel (#34445)
    
    * [improvement](spill) improve cancel
    
    * fix
---
 .../exec/partitioned_aggregation_sink_operator.cpp |  26 ++-
 .../exec/partitioned_aggregation_sink_operator.h   |   5 -
 .../partitioned_aggregation_source_operator.cpp    |  20 +-
 .../exec/partitioned_aggregation_source_operator.h |   5 -
 .../exec/partitioned_hash_join_probe_operator.cpp  | 214 +++++++++++++--------
 .../exec/partitioned_hash_join_probe_operator.h    |   5 -
 .../exec/partitioned_hash_join_sink_operator.cpp   |  68 +++++--
 .../exec/partitioned_hash_join_sink_operator.h     |   5 -
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  27 ++-
 be/src/pipeline/exec/spill_sort_sink_operator.h    |   5 -
 .../pipeline/exec/spill_sort_source_operator.cpp   |  19 +-
 be/src/pipeline/exec/spill_sort_source_operator.h  |   5 -
 12 files changed, 245 insertions(+), 159 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 16faa90c9af..4d007ae9dc4 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -28,9 +28,9 @@ namespace doris::pipeline {
 
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase*
 parent,
                                                            RuntimeState* state)
         : Base(parent, state) {
-    _finish_dependency = std::make_shared<FinishDependency>(
-            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
-            state->get_query_ctx());
+    _finish_dependency = std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
+                                                      parent->get_name() + 
"_SPILL_DEPENDENCY",
+                                                      true, 
state->get_query_ctx());
 }
 Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
                                           doris::pipeline::LocalSinkStateInfo& 
info) {
@@ -250,21 +250,31 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
     }};
 
     auto execution_context = state->get_task_execution_context();
-    _shared_state_holder = _shared_state->shared_from_this();
+    /// Resources in shared state will be released when the operator is closed,
+    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
+    /// So, we need hold the pointer of shared state.
+    std::weak_ptr<PartitionedAggSharedState> shared_state_holder =
+            _shared_state->shared_from_this();
     auto query_id = state->query_id();
+    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
     status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
-            [this, &parent, state, query_id, execution_context, submit_timer] {
-                auto execution_context_lock = execution_context.lock();
-                if (!execution_context_lock) {
+            [this, &parent, state, query_id, mem_tracker, shared_state_holder, 
execution_context,
+             submit_timer] {
+                SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+                std::shared_ptr<TaskExecutionContext> execution_context_lock;
+                auto shared_state_sptr = shared_state_holder.lock();
+                if (shared_state_sptr) {
+                    execution_context_lock = execution_context.lock();
+                }
+                if (!shared_state_sptr || !execution_context_lock) {
                     LOG(INFO) << "query " << print_id(query_id)
                               << " execution_context released, maybe query was 
cancelled.";
                     return Status::Cancelled("Cancelled");
                 }
                 
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                SCOPED_ATTACH_TASK(state);
                 SCOPED_TIMER(Base::_spill_timer);
                 Defer defer {[&]() {
                     if (!_shared_state->sink_status.ok() || 
state->is_cancelled()) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 1b415629f22..5badc4916eb 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -261,11 +261,6 @@ public:
     bool _eos = false;
     std::shared_ptr<Dependency> _finish_dependency;
 
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::shared_ptr<PartitionedAggSharedState> _shared_state_holder;
-
     // temp structures during spilling
     vectorized::MutableColumns key_columns_;
     vectorized::MutableColumns value_columns_;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index ff4795f2079..43c805b9557 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -204,17 +204,28 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
     _dependency->Dependency::block();
 
     auto execution_context = state->get_task_execution_context();
-    _shared_state_holder = _shared_state->shared_from_this();
+    /// Resources in shared state will be released when the operator is closed,
+    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
+    /// So, we need hold the pointer of shared state.
+    std::weak_ptr<PartitionedAggSharedState> shared_state_holder =
+            _shared_state->shared_from_this();
     auto query_id = state->query_id();
+    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
     RETURN_IF_ERROR(
             
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
-                    [this, state, query_id, execution_context, submit_timer] {
-                        auto execution_context_lock = execution_context.lock();
-                        if (!execution_context_lock) {
+                    [this, state, query_id, mem_tracker, shared_state_holder, 
execution_context,
+                     submit_timer] {
+                        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+                        std::shared_ptr<TaskExecutionContext> 
execution_context_lock;
+                        auto shared_state_sptr = shared_state_holder.lock();
+                        if (shared_state_sptr) {
+                            execution_context_lock = execution_context.lock();
+                        }
+                        if (!shared_state_sptr || !execution_context_lock) {
                             LOG(INFO) << "query " << print_id(query_id)
                                       << " execution_context released, maybe 
query was cancelled.";
                             // FIXME: return status is meaningless?
@@ -222,7 +233,6 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                         }
 
                         
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                        SCOPED_ATTACH_TASK(state);
                         Defer defer {[&]() {
                             if (!_status.ok() || state->is_cancelled()) {
                                 if (!_status.ok()) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index ad835948dd9..a847b7fcf88 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -59,11 +59,6 @@ protected:
     bool _current_partition_eos = true;
     bool _is_merging = false;
 
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::shared_ptr<PartitionedAggSharedState> _shared_state_holder;
-
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
     RuntimeProfile::Counter* _get_results_timer = nullptr;
     RuntimeProfile::Counter* _serialize_result_timer = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index d8935003de6..10928024992 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -155,7 +155,6 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
 
 Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* 
state,
                                                              uint32_t 
partition_index) {
-    _shared_state_holder = _shared_state->shared_from_this();
     auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
     auto& mutable_block = partitioned_build_blocks[partition_index];
     if (!mutable_block ||
@@ -178,46 +177,58 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     auto execution_context = state->get_task_execution_context();
+    /// Resources in shared state will be released when the operator is closed,
+    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
+    /// So, we need hold the pointer of shared state.
+    std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+            _shared_state->shared_from_this();
+    auto query_id = state->query_id();
+    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
+
     MonotonicStopWatch submit_timer;
     submit_timer.start();
-    return spill_io_pool->submit_func(
-            [execution_context, state, &build_spilling_stream, &mutable_block, 
submit_timer, this] {
-                auto execution_context_lock = execution_context.lock();
-                if (!execution_context_lock) {
-                    LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
-                    return;
-                }
-                
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                SCOPED_TIMER(_spill_build_timer);
-                (void)state; // avoid ut compile error
-                SCOPED_ATTACH_TASK(state);
-                if (_spill_status_ok) {
-                    auto build_block = mutable_block->to_block();
-                    DCHECK_EQ(mutable_block->rows(), 0);
-                    auto st = build_spilling_stream->spill_block(state, 
build_block, false);
-                    if (!st.ok()) {
-                        std::unique_lock<std::mutex> lock(_spill_lock);
-                        _spill_status_ok = false;
-                        _spill_status = std::move(st);
-                    } else {
-                        COUNTER_UPDATE(_spill_build_rows, build_block.rows());
-                        COUNTER_UPDATE(_spill_build_blocks, 1);
-                    }
-                }
-                --_spilling_task_count;
+    return spill_io_pool->submit_func([query_id, mem_tracker, 
shared_state_holder,
+                                       execution_context, state, 
&build_spilling_stream,
+                                       &mutable_block, submit_timer, this] {
+        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+        auto shared_state_sptr = shared_state_holder.lock();
+        if (shared_state_sptr) {
+            execution_context_lock = execution_context.lock();
+        }
+        if (!shared_state_sptr || !execution_context_lock) {
+            LOG(INFO) << "query " << print_id(query_id)
+                      << " execution_context released, maybe query was 
cancelled.";
+            return;
+        }
+        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+        SCOPED_TIMER(_spill_build_timer);
+        if (_spill_status_ok) {
+            auto build_block = mutable_block->to_block();
+            DCHECK_EQ(mutable_block->rows(), 0);
+            auto st = build_spilling_stream->spill_block(state, build_block, 
false);
+            if (!st.ok()) {
+                std::unique_lock<std::mutex> lock(_spill_lock);
+                _spill_status_ok = false;
+                _spill_status = std::move(st);
+            } else {
+                COUNTER_UPDATE(_spill_build_rows, build_block.rows());
+                COUNTER_UPDATE(_spill_build_blocks, 1);
+            }
+        }
+        --_spilling_task_count;
 
-                if (_spilling_task_count == 0) {
-                    LOG(INFO) << "hash probe " << _parent->id()
-                              << " revoke memory spill_build_block finish";
-                    std::unique_lock<std::mutex> lock(_spill_lock);
-                    _dependency->set_ready();
-                }
-            });
+        if (_spilling_task_count == 0) {
+            LOG(INFO) << "hash probe " << _parent->id()
+                      << " revoke memory spill_build_block finish";
+            std::unique_lock<std::mutex> lock(_spill_lock);
+            _dependency->set_ready();
+        }
+    });
 }
 
 Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state,
                                                               uint32_t 
partition_index) {
-    _shared_state_holder = _shared_state->shared_from_this();
     auto& spilling_stream = _probe_spilling_streams[partition_index];
     if (!spilling_stream) {
         
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
@@ -242,45 +253,60 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
 
     if (!blocks.empty()) {
         auto execution_context = state->get_task_execution_context();
+        /// Resources in shared state will be released when the operator is 
closed,
+        /// but there may be asynchronous spilling tasks at this time, which 
can lead to conflicts.
+        /// So, we need hold the pointer of shared state.
+        std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+                _shared_state->shared_from_this();
+
+        auto query_id = state->query_id();
+        auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
+
         MonotonicStopWatch submit_timer;
         submit_timer.start();
-        return spill_io_pool->submit_func(
-                [execution_context, state, &blocks, spilling_stream, 
submit_timer, this] {
-                    auto execution_context_lock = execution_context.lock();
-                    if (!execution_context_lock) {
-                        LOG(INFO) << "execution_context released, maybe query 
was cancelled.";
-                        return;
-                    }
-                    
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                    SCOPED_TIMER(_spill_probe_timer);
-                    SCOPED_ATTACH_TASK(state);
-                    COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
-                    while (!blocks.empty() && !state->is_cancelled()) {
-                        auto block = std::move(blocks.back());
-                        blocks.pop_back();
-                        if (_spill_status_ok) {
-                            auto st = spilling_stream->spill_block(state, 
block, false);
-                            if (!st.ok()) {
-                                std::unique_lock<std::mutex> lock(_spill_lock);
-                                _spill_status_ok = false;
-                                _spill_status = std::move(st);
-                                break;
-                            }
-                            COUNTER_UPDATE(_spill_probe_rows, block.rows());
-                        } else {
-                            break;
-                        }
+        return spill_io_pool->submit_func([query_id, mem_tracker, 
shared_state_holder,
+                                           execution_context, state, &blocks, 
spilling_stream,
+                                           submit_timer, this] {
+            SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+            std::shared_ptr<TaskExecutionContext> execution_context_lock;
+            auto shared_state_sptr = shared_state_holder.lock();
+            if (shared_state_sptr) {
+                execution_context_lock = execution_context.lock();
+            }
+            if (!shared_state_sptr || !execution_context_lock) {
+                LOG(INFO) << "query " << print_id(query_id)
+                          << " execution_context released, maybe query was 
cancelled.";
+                return;
+            }
+            _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+            SCOPED_TIMER(_spill_probe_timer);
+            COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
+            while (!blocks.empty() && !state->is_cancelled()) {
+                auto block = std::move(blocks.back());
+                blocks.pop_back();
+                if (_spill_status_ok) {
+                    auto st = spilling_stream->spill_block(state, block, 
false);
+                    if (!st.ok()) {
+                        std::unique_lock<std::mutex> lock(_spill_lock);
+                        _spill_status_ok = false;
+                        _spill_status = std::move(st);
+                        break;
                     }
+                    COUNTER_UPDATE(_spill_probe_rows, block.rows());
+                } else {
+                    break;
+                }
+            }
 
-                    --_spilling_task_count;
+            --_spilling_task_count;
 
-                    if (_spilling_task_count == 0) {
-                        LOG(INFO) << "hash probe " << _parent->id()
-                                  << " revoke memory spill_probe_blocks 
finish";
-                        std::unique_lock<std::mutex> lock(_spill_lock);
-                        _dependency->set_ready();
-                    }
-                });
+            if (_spilling_task_count == 0) {
+                LOG(INFO) << "hash probe " << _parent->id()
+                          << " revoke memory spill_probe_blocks finish";
+                std::unique_lock<std::mutex> lock(_spill_lock);
+                _dependency->set_ready();
+            }
+        });
     } else {
         --_spilling_task_count;
         if (_spilling_task_count == 0) {
@@ -313,7 +339,6 @@ Status 
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
 Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState*
 state,
                                                                            
uint32_t partition_index,
                                                                            
bool& has_data) {
-    _shared_state_holder = _shared_state->shared_from_this();
     auto& spilled_stream = _shared_state->spilled_streams[partition_index];
     has_data = false;
     if (!spilled_stream) {
@@ -328,23 +353,35 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
     }
 
     auto execution_context = state->get_task_execution_context();
+    /// Resources in shared state will be released when the operator is closed,
+    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
+    /// So, we need hold the pointer of shared state.
+    std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+            _shared_state->shared_from_this();
+
+    auto query_id = state->query_id();
+    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    auto read_func = [this, state, &spilled_stream, &mutable_block, 
execution_context,
-                      submit_timer] {
-        auto execution_context_lock = execution_context.lock();
-        if (!execution_context_lock || state->is_cancelled()) {
-            LOG(INFO) << "execution_context released, maybe query was 
canceled.";
+    auto read_func = [this, query_id, mem_tracker, state, &spilled_stream, 
&mutable_block,
+                      shared_state_holder, execution_context, submit_timer] {
+        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+        auto shared_state_sptr = shared_state_holder.lock();
+        if (shared_state_sptr) {
+            execution_context_lock = execution_context.lock();
+        }
+        if (!shared_state_sptr || !execution_context_lock || 
state->is_cancelled()) {
+            LOG(INFO) << "query " << print_id(query_id)
+                      << " execution_context released, maybe query was 
cancelled.";
             return;
         }
 
-        SCOPED_ATTACH_TASK(state);
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         SCOPED_TIMER(_recovery_build_timer);
         Defer defer([this] { --_spilling_task_count; });
-        (void)state; // avoid ut compile error
         DCHECK_EQ(_spill_status_ok.load(), true);
 
         bool eos = false;
@@ -369,10 +406,10 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
                 break;
             }
 
-            DCHECK_EQ(mutable_block->columns(), block.columns());
             if (mutable_block->empty()) {
                 *mutable_block = std::move(block);
             } else {
+                DCHECK_EQ(mutable_block->columns(), block.columns());
                 st = mutable_block->merge(std::move(block));
                 if (!st.ok()) {
                     std::unique_lock<std::mutex> lock(_spill_lock);
@@ -404,7 +441,6 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
 Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState*
 state,
                                                                            
uint32_t partition_index,
                                                                            
bool& has_data) {
-    _shared_state_holder = _shared_state->shared_from_this();
     auto& spilled_stream = _probe_spilling_streams[partition_index];
     has_data = false;
     if (!spilled_stream) {
@@ -415,22 +451,32 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
 
     /// TODO: maybe recovery more blocks each time.
     auto execution_context = state->get_task_execution_context();
+    std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+            _shared_state->shared_from_this();
+
+    auto query_id = state->query_id();
+    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    auto read_func = [this, execution_context, state, &spilled_stream, 
&blocks, submit_timer] {
-        auto execution_context_lock = execution_context.lock();
-        if (!execution_context_lock) {
-            LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
+    auto read_func = [this, query_id, mem_tracker, shared_state_holder, 
execution_context,
+                      &spilled_stream, &blocks, submit_timer] {
+        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+        auto shared_state_sptr = shared_state_holder.lock();
+        if (shared_state_sptr) {
+            execution_context_lock = execution_context.lock();
+        }
+        if (!shared_state_sptr || !execution_context_lock) {
+            LOG(INFO) << "query " << print_id(query_id)
+                      << " execution_context released, maybe query was 
cancelled.";
             return;
         }
 
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         SCOPED_TIMER(_recovery_probe_timer);
         Defer defer([this] { --_spilling_task_count; });
-        (void)state; // avoid ut compile error
-        SCOPED_ATTACH_TASK(state);
         DCHECK_EQ(_spill_status_ok.load(), true);
 
         vectorized::Block block;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index f338d205407..1b37a725150 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -81,11 +81,6 @@ private:
     std::vector<std::unique_ptr<vectorized::MutableBlock>> _partitioned_blocks;
     std::map<uint32_t, std::vector<vectorized::Block>> _probe_blocks;
 
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::shared_ptr<PartitionedHashJoinSharedState> _shared_state_holder;
-
     std::vector<vectorized::SpillStreamSPtr> _probe_spilling_streams;
 
     std::unique_ptr<PartitionerType> _partitioner;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 80f1915bad1..890e68e732f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -102,6 +102,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
     _shared_state->inner_shared_state->hash_table_variants.reset();
     auto row_desc = p._child_x->row_desc();
+    const auto num_slots = row_desc.num_slots();
     std::vector<vectorized::Block> build_blocks;
     auto inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
     if (inner_sink_state_) {
@@ -116,11 +117,18 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
     }
 
     auto execution_context = state->get_task_execution_context();
+    /// Resources in shared state will be released when the operator is closed,
+    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
+    /// So, we need hold the pointer of shared state.
+    std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+            _shared_state->shared_from_this();
+
     _dependency->block();
     auto query_id = state->query_id();
     auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
-    auto spill_func = [execution_context, build_blocks = 
std::move(build_blocks), state, query_id,
-                       mem_tracker, this]() mutable {
+    auto spill_func = [shared_state_holder, execution_context,
+                       build_blocks = std::move(build_blocks), state, 
query_id, mem_tracker,
+                       num_slots, this]() mutable {
         SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
         Defer defer {[&]() {
             // need to reset build_block here, or else build_block will be 
destructed
@@ -128,8 +136,12 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
             build_blocks.clear();
         }};
 
-        auto execution_context_lock = execution_context.lock();
-        if (!execution_context_lock || state->is_cancelled()) {
+        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+        auto shared_state_sptr = shared_state_holder.lock();
+        if (shared_state_sptr) {
+            execution_context_lock = execution_context.lock();
+        }
+        if (!shared_state_sptr || !execution_context_lock || 
state->is_cancelled()) {
             LOG(INFO) << "execution_context released, maybe query was 
canceled.";
             return;
         }
@@ -163,6 +175,11 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
             if (UNLIKELY(build_block.empty())) {
                 continue;
             }
+
+            if (build_block.columns() > num_slots) {
+                build_block.erase(num_slots);
+            }
+
             {
                 SCOPED_TIMER(_partition_timer);
                 (void)_partitioner->do_partitioning(state, &build_block, 
_mem_tracker.get());
@@ -213,13 +230,24 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
               << ", eos: " << _child_eos;
     DCHECK_EQ(_spilling_streams_count, 0);
 
-    _shared_state_holder = _shared_state->shared_from_this();
     if (!_shared_state->need_to_spill) {
+        profile()->add_info_string("Spilled", "true");
         _shared_state->need_to_spill = true;
         return _revoke_unpartitioned_block(state);
     }
 
     _spilling_streams_count = _shared_state->partitioned_build_blocks.size();
+
+    auto execution_context = state->get_task_execution_context();
+    /// Resources in shared state will be released when the operator is closed,
+    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
+    /// So, we need hold the pointer of shared state.
+    std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
+            _shared_state->shared_from_this();
+
+    auto query_id = state->query_id();
+    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
+
     for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); 
++i) {
         vectorized::SpillStreamSPtr& spilling_stream = 
_shared_state->spilled_streams[i];
         auto& mutable_block = _shared_state->partitioned_build_blocks[i];
@@ -235,24 +263,26 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
         auto* spill_io_pool =
                 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
         DCHECK(spill_io_pool != nullptr);
-        auto execution_context = state->get_task_execution_context();
 
         MonotonicStopWatch submit_timer;
         submit_timer.start();
 
-        auto st = spill_io_pool->submit_func(
-                [this, execution_context, state, spilling_stream, i, 
submit_timer] {
-                    auto execution_context_lock = execution_context.lock();
-                    if (!execution_context_lock) {
-                        LOG(INFO) << "execution_context released, maybe query 
was cancelled.";
-                        return;
-                    }
-                    
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                    SCOPED_TIMER(_spill_build_timer);
-                    (void)state; // avoid ut compile error
-                    SCOPED_ATTACH_TASK(state);
-                    _spill_to_disk(i, spilling_stream);
-                });
+        auto st = spill_io_pool->submit_func([this, query_id, mem_tracker, 
shared_state_holder,
+                                              execution_context, 
spilling_stream, i, submit_timer] {
+            SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+            std::shared_ptr<TaskExecutionContext> execution_context_lock;
+            auto shared_state_sptr = shared_state_holder.lock();
+            if (shared_state_sptr) {
+                execution_context_lock = execution_context.lock();
+            }
+            if (!shared_state_sptr || !execution_context_lock) {
+                LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
+                return;
+            }
+            _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+            SCOPED_TIMER(_spill_build_timer);
+            _spill_to_disk(i, spilling_stream);
+        });
 
         if (!st.ok()) {
             --_spilling_streams_count;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 46ab5eab619..e527d601fff 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -70,11 +70,6 @@ protected:
     Status _spill_status;
     std::mutex _spill_status_lock;
 
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::shared_ptr<PartitionedHashJoinSharedState> _shared_state_holder;
-
     std::unique_ptr<PartitionerType> _partitioner;
 
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 2fdd78b40b3..c6a943c59b5 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -23,9 +23,9 @@
 namespace doris::pipeline {
 SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* 
parent, RuntimeState* state)
         : Base(parent, state) {
-    _finish_dependency = std::make_shared<FinishDependency>(
-            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
-            state->get_query_ctx());
+    _finish_dependency = std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
+                                                      parent->get_name() + 
"_SPILL_DEPENDENCY",
+                                                      true, 
state->get_query_ctx());
 }
 
 Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
@@ -226,23 +226,34 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
     }
 
     auto execution_context = state->get_task_execution_context();
-    _shared_state_holder = _shared_state->shared_from_this();
+
+    /// Resources in shared state will be released when the operator is closed,
+    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
+    /// So, we need hold the pointer of shared state.
+    std::weak_ptr<SpillSortSharedState> shared_state_holder = 
_shared_state->shared_from_this();
+
     auto query_id = state->query_id();
+    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
     status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
-            [this, state, query_id, &parent, execution_context, submit_timer] {
-                auto execution_context_lock = execution_context.lock();
-                if (!execution_context_lock) {
+            [this, state, query_id, mem_tracker, shared_state_holder, &parent, 
execution_context,
+             submit_timer] {
+                SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+                std::shared_ptr<TaskExecutionContext> execution_context_lock;
+                auto shared_state_sptr = shared_state_holder.lock();
+                if (shared_state_sptr) {
+                    execution_context_lock = execution_context.lock();
+                }
+                if (!shared_state_sptr || !execution_context_lock) {
                     LOG(INFO) << "query " << print_id(query_id)
                               << " execution_context released, maybe query was 
cancelled.";
                     return Status::OK();
                 }
 
                 
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                SCOPED_ATTACH_TASK(state);
                 Defer defer {[&]() {
                     if (!_shared_state->sink_status.ok() || 
state->is_cancelled()) {
                         if (!_shared_state->sink_status.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index feba33bc96d..d6557454f9d 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -46,11 +46,6 @@ private:
 
     friend class SpillSortSinkOperatorX;
 
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::shared_ptr<SpillSortSharedState> _shared_state_holder;
-
     std::unique_ptr<RuntimeState> _runtime_state;
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
     RuntimeProfile::Counter* _partial_sort_timer = nullptr;
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index a086fcc43e6..fe6b4ee3efc 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -88,15 +88,25 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
     }};
 
     auto execution_context = state->get_task_execution_context();
-    _shared_state_holder = _shared_state->shared_from_this();
+    /// Resources in shared state will be released when the operator is closed,
+    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
+    /// So, we need hold the pointer of shared state.
+    std::weak_ptr<SpillSortSharedState> shared_state_holder = 
_shared_state->shared_from_this();
     auto query_id = state->query_id();
+    auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    auto spill_func = [this, state, query_id, &parent, execution_context, 
submit_timer] {
-        auto execution_context_lock = execution_context.lock();
-        if (!execution_context_lock) {
+    auto spill_func = [this, state, query_id, mem_tracker, &parent, 
shared_state_holder,
+                       execution_context, submit_timer] {
+        SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+        std::shared_ptr<TaskExecutionContext> execution_context_lock;
+        auto shared_state_sptr = shared_state_holder.lock();
+        if (shared_state_sptr) {
+            execution_context_lock = execution_context.lock();
+        }
+        if (!shared_state_sptr || !execution_context_lock) {
             LOG(INFO) << "query " << print_id(query_id)
                       << " execution_context released, maybe query was 
cancelled.";
             return Status::OK();
@@ -104,7 +114,6 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
 
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         SCOPED_TIMER(_spill_merge_sort_timer);
-        SCOPED_ATTACH_TASK(state);
         Defer defer {[&]() {
             if (!_status.ok() || state->is_cancelled()) {
                 if (!_status.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h 
b/be/src/pipeline/exec/spill_sort_source_operator.h
index 1afe0597ad5..ffe8e8a6898 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -56,11 +56,6 @@ protected:
     bool _opened = false;
     Status _status;
 
-    /// Resources in shared state will be released when the operator is closed,
-    /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
-    /// So, we need hold the pointer of shared state.
-    std::shared_ptr<SpillSortSharedState> _shared_state_holder;
-
     int64_t _external_sort_bytes_threshold = 134217728; // 128M
     std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to