This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a11a590368d [fix](pipeline) Fix query hang up if limited rows is 
reached (#35513)
a11a590368d is described below

commit a11a590368da90975cc8195731f2ac8660f6a747
Author: Gabriel <[email protected]>
AuthorDate: Fri May 31 21:55:46 2024 +0800

    [fix](pipeline) Fix query hang up if limited rows is reached (#35513)
    
    Follow-up for #35466.
    
    We should assure closed tasks will not block other tasks.
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 41 +++++++++++++---------
 be/src/pipeline/exec/exchange_sink_buffer.h        | 11 +++---
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  9 +++--
 be/src/pipeline/exec/exchange_sink_operator.h      |  3 ++
 be/src/pipeline/exec/operator.h                    |  8 +++++
 .../local_exchange_source_operator.cpp             |  8 +++++
 be/src/pipeline/local_exchange/local_exchanger.cpp | 25 +++++++++++++
 be/src/pipeline/local_exchange/local_exchanger.h   |  4 +++
 be/src/pipeline/pipeline_fragment_context.h        |  8 +++++
 be/src/pipeline/pipeline_task.cpp                  | 12 ++++---
 be/src/pipeline/pipeline_task.h                    |  6 ++++
 be/src/runtime/fragment_mgr.cpp                    |  3 ++
 12 files changed, 109 insertions(+), 29 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index d66e5ebd680..8893db54cc5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -84,7 +84,8 @@ std::shared_ptr<BroadcastPBlockHolder> 
BroadcastPBlockHolderQueue::pop() {
 namespace pipeline {
 
 ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId 
dest_node_id, int send_id,
-                                       int be_number, RuntimeState* state)
+                                       int be_number, RuntimeState* state,
+                                       ExchangeSinkLocalState* parent)
         : HasTaskExecutionCtx(state),
           _queue_capacity(0),
           _is_finishing(false),
@@ -93,9 +94,8 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, 
PlanNodeId dest_node_
           _sender_id(send_id),
           _be_number(be_number),
           _state(state),
-          _context(state->get_query_ctx()) {}
-
-ExchangeSinkBuffer::~ExchangeSinkBuffer() = default;
+          _context(state->get_query_ctx()),
+          _parent(parent) {}
 
 void ExchangeSinkBuffer::close() {
     // Could not clear the queue here, because there maybe a running rpc want 
to
@@ -213,8 +213,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             _instance_to_broadcast_package_queue[id];
 
     if (_is_finishing) {
-        _rpc_channel_is_idle[id] = true;
-        _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
+        _turn_off_channel(id);
         return Status::OK();
     }
 
@@ -372,8 +371,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         }
         broadcast_q.pop();
     } else {
-        _rpc_channel_is_idle[id] = true;
-        _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
+        _turn_off_channel(id);
     }
 
     return Status::OK();
@@ -403,26 +401,21 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
         __builtin_unreachable();
     } else {
         std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[id]);
-        if (!_rpc_channel_is_idle[id]) {
-            _rpc_channel_is_idle[id] = true;
-            _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
-        }
+        _turn_off_channel(id);
     }
 }
 
 void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
     _is_finishing = true;
     _context->cancel(Status::Cancelled(err));
-    _ended(id);
+    std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
+    _turn_off_channel(id, true);
 }
 
 void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
     _instance_to_receiver_eof[id] = true;
-    if (!_rpc_channel_is_idle[id]) {
-        _rpc_channel_is_idle[id] = true;
-        _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
-    }
+    _turn_off_channel(id, true);
     std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
     swap(empty, _instance_to_broadcast_package_queue[id]);
 }
@@ -432,6 +425,20 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) 
{
     return _instance_to_receiver_eof[id];
 }
 
+void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, bool cleanup) {
+    if (!_rpc_channel_is_idle[id]) {
+        _rpc_channel_is_idle[id] = true;
+        auto all_done = _busy_channels.fetch_sub(1) == 1;
+        _set_ready_to_finish(all_done);
+        if (cleanup && all_done) {
+            auto weak_task_ctx = weak_task_exec_ctx();
+            if (auto pip_ctx = weak_task_ctx.lock()) {
+                _parent->set_reach_limit();
+            }
+        }
+    }
+}
+
 void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* 
min_time) {
     int64_t local_max_time = 0;
     int64_t local_min_time = INT64_MAX;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 2955ff959de..8eed559e712 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -47,6 +47,7 @@ using InstanceLoId = int64_t;
 
 namespace pipeline {
 class Dependency;
+class ExchangeSinkLocalState;
 } // namespace pipeline
 
 namespace vectorized {
@@ -192,11 +193,11 @@ struct ExchangeRpcContext {
 };
 
 // Each ExchangeSinkOperator have one ExchangeSinkBuffer
-class ExchangeSinkBuffer : public HasTaskExecutionCtx {
+class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
 public:
     ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int 
send_id, int be_number,
-                       RuntimeState* state);
-    ~ExchangeSinkBuffer();
+                       RuntimeState* state, ExchangeSinkLocalState* parent);
+    ~ExchangeSinkBuffer() = default;
     void register_sink(TUniqueId);
 
     Status add_block(TransmitInfo&& request);
@@ -265,6 +266,7 @@ private:
     inline void _failed(InstanceLoId id, const std::string& err);
     inline void _set_receiver_eof(InstanceLoId id);
     inline bool _is_receiver_eof(InstanceLoId id);
+    inline void _turn_off_channel(InstanceLoId id, bool cleanup = false);
     void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
     int64_t get_sum_rpc_time();
 
@@ -272,7 +274,8 @@ private:
     std::shared_ptr<Dependency> _queue_dependency = nullptr;
     std::shared_ptr<Dependency> _finish_dependency = nullptr;
     std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
-    std::atomic<bool> _should_stop {false};
+    std::atomic<bool> _should_stop = false;
+    ExchangeSinkLocalState* _parent = nullptr;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 244184bc7a3..67994c85cfa 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -132,7 +132,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     id.set_hi(_state->query_id().hi);
     id.set_lo(_state->query_id().lo);
     _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, 
_sender_id,
-                                                        _state->be_number(), 
state);
+                                                        _state->be_number(), 
state, this);
 
     register_channels(_sink_buffer.get());
     _queue_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
@@ -624,8 +624,11 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
 std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
-    fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, 
_busy_channels = {})",
-                   _sink_buffer->_should_stop.load(), 
_sink_buffer->_busy_channels.load());
+    fmt::format_to(debug_string_buffer,
+                   ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, 
_is_finishing = {}), "
+                   "_reach_limit: {}",
+                   _sink_buffer->_should_stop.load(), 
_sink_buffer->_busy_channels.load(),
+                   _sink_buffer->_is_finishing.load(), _reach_limit.load());
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 72fdbc3354d..fa72db6702f 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -100,6 +100,8 @@ public:
     RuntimeProfile::Counter* compress_timer() { return _compress_timer; }
     RuntimeProfile::Counter* uncompressed_bytes_counter() { return 
_uncompressed_bytes_counter; }
     [[nodiscard]] bool transfer_large_data_by_brpc() const;
+    bool is_finished() const override { return _reach_limit.load(); }
+    void set_reach_limit() { _reach_limit = true; };
 
     [[nodiscard]] int sender_id() const { return _sender_id; }
 
@@ -199,6 +201,7 @@ private:
 
     // for external table sink hash partition
     std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
+    std::atomic<bool> _reach_limit = false;
 };
 
 class ExchangeSinkOperatorX final : public 
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index a24a58f883b..4eef9589a23 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -312,6 +312,7 @@ public:
     // idempotent (e.g. wait for runtime filters).
     virtual Status open(RuntimeState* state) = 0;
     virtual Status close(RuntimeState* state, Status exec_status) = 0;
+    [[nodiscard]] virtual bool is_finished() const { return false; }
 
     [[nodiscard]] virtual std::string debug_string(int indentation_level) 
const = 0;
 
@@ -445,6 +446,13 @@ public:
 
     Status prepare(RuntimeState* state) override { return Status::OK(); }
     Status open(RuntimeState* state) override { return Status::OK(); }
+    [[nodiscard]] bool is_finished(RuntimeState* state) const {
+        auto result = state->get_sink_local_state_result();
+        if (!result) {
+            return result.error();
+        }
+        return result.value()->is_finished();
+    }
 
     [[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* 
block, bool eos) = 0;
 
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index ea9f7861dbf..1c8dff51a29 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -52,6 +52,9 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* 
state) {
         return Status::OK();
     }
 
+    if (_exchanger) {
+        _exchanger->close(*this);
+    }
     if (_shared_state) {
         _shared_state->sub_running_source_operators();
     }
@@ -67,6 +70,11 @@ std::string LocalExchangeSourceLocalState::debug_string(int 
indentation_level) c
                    Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
                    _exchanger->_num_senders, _exchanger->_num_sources,
                    _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators);
+    size_t i = 0;
+    fmt::format_to(debug_string_buffer, ", MemTrackers: ");
+    for (auto* mem_tracker : _shared_state->mem_trackers) {
+        fmt::format_to(debug_string_buffer, "{}: {}, ", i, 
mem_tracker->consumption());
+    }
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 97310948307..1c03b28415d 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -39,6 +39,16 @@ Status ShuffleExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block,
     return Status::OK();
 }
 
+void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
+    PartitionedBlock partitioned_block;
+    while 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+        auto block_wrapper = partitioned_block.first;
+        local_state._shared_state->sub_mem_usage(
+                local_state._channel_id, 
block_wrapper->data_block.allocated_bytes(), false);
+        block_wrapper->unref(local_state._shared_state);
+    }
+}
+
 Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                    LocalExchangeSourceLocalState& local_state) 
{
     PartitionedBlock partitioned_block;
@@ -182,6 +192,14 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     return Status::OK();
 }
 
+void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
+    vectorized::Block next_block;
+    while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+        local_state._shared_state->sub_mem_usage(local_state._channel_id,
+                                                 next_block.allocated_bytes());
+    }
+}
+
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                        LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
@@ -255,6 +273,13 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
     return Status::OK();
 }
 
+void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
+    vectorized::Block next_block;
+    while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+        // do nothing
+    }
+}
+
 Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                      LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 476f479e11e..bc07c806094 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -48,6 +48,7 @@ public:
     virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool 
eos,
                         LocalExchangeSinkLocalState& local_state) = 0;
     virtual ExchangeType get_type() const = 0;
+    virtual void close(LocalExchangeSourceLocalState& local_state) {}
 
 protected:
     friend struct LocalExchangeSharedState;
@@ -108,6 +109,7 @@ public:
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
                      LocalExchangeSourceLocalState& local_state) override;
+    void close(LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return 
ExchangeType::HASH_SHUFFLE; }
 
 protected:
@@ -150,6 +152,7 @@ public:
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; 
}
+    void close(LocalExchangeSourceLocalState& local_state) override;
 
 private:
     std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
@@ -188,6 +191,7 @@ public:
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
+    void close(LocalExchangeSourceLocalState& local_state) override;
 
 private:
     std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 8bc1eb29139..94dd96731c2 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -131,6 +131,14 @@ public:
         }
     }
 
+    void clear_finished_tasks() {
+        for (size_t j = 0; j < _tasks.size(); j++) {
+            for (size_t i = 0; i < _tasks[j].size(); i++) {
+                _tasks[j][i]->stop_if_finished();
+            }
+        }
+    };
+
 private:
     Status _build_pipelines(ObjectPool* pool, const 
doris::TPipelineFragmentParams& request,
                             const DescriptorTbl& descs, OperatorXPtr* root, 
PipelinePtr cur_pipe);
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index d26b0fce387..52a76828804 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -273,6 +273,7 @@ Status PipelineTask::execute(bool* eos) {
     SCOPED_TIMER(_task_profile->total_time_counter());
     SCOPED_TIMER(_exec_timer);
     SCOPED_ATTACH_TASK(_state);
+    _eos = _sink->is_finished(_state) || _eos;
     *eos = _eos;
     if (_eos) {
         // If task is waken up by finish dependency, `_eos` is set to true by 
last execution, and we should return here.
@@ -334,14 +335,15 @@ Status PipelineTask::execute(bool* eos) {
                     Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task 
executing failed");
             return status;
         });
-        // Pull block from operator chain
-        if (!_dry_run) {
+        // `_dry_run` means sink operator need no more data
+        // `_sink->is_finished(_state)` means sink operator should be finished
+        if (_dry_run || _sink->is_finished(_state)) {
+            *eos = true;
+            _eos = true;
+        } else {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
             
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_root->get_block_after_projects(_state, 
block, eos));
-        } else {
-            *eos = true;
-            _eos = true;
         }
 
         if (_block->rows() != 0 || *eos) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index bb6587eec28..6bc65905be6 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -222,6 +222,12 @@ public:
 
     std::string task_name() const { return fmt::format("task{}({})", _index, 
_pipeline->_name); }
 
+    void stop_if_finished() {
+        if (_sink->is_finished(_state)) {
+            clear_blocking_state();
+        }
+    }
+
 private:
     friend class RuntimeFilterDependency;
     bool _is_blocked();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 852179e98e9..69fff2951ef 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1046,6 +1046,9 @@ void FragmentMgr::cancel_worker() {
                     
to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id());
                 }
             }
+            for (auto& pipeline_itr : _pipeline_map) {
+                pipeline_itr.second->clear_finished_tasks();
+            }
             for (auto it = _query_ctx_map.begin(); it != 
_query_ctx_map.end();) {
                 if (auto q_ctx = it->second.lock()) {
                     if (q_ctx->is_timeout(now) && 
q_ctx->enable_pipeline_x_exec()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to