This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 64482b2e7c7e4a4aaa5cda4f5b5d7f7b3b678168
Author: Gabriel <[email protected]>
AuthorDate: Tue May 28 10:31:31 2024 +0800

    [fix](pipeline) Fix query hang if limited rows is reached (#35466)
    
    ## Proposed changes
    
    Some operators has limit condition, the source operator should notify
    the sink operator that limit reached.
    Although FE has limit logic but it not always send .
    
    ## Further comments
    
    If this is a relatively large or complex change, kick off the discussion
    at [[email protected]](mailto:[email protected]) by explaining why
    you chose the solution you did and what alternatives you considered,
    etc...
---
 be/src/pipeline/dependency.cpp                          |  7 +++++++
 be/src/pipeline/dependency.h                            |  5 +++++
 .../local_exchange/local_exchange_sink_operator.cpp     | 11 +++++++++--
 .../local_exchange/local_exchange_source_operator.cpp   | 17 +++++++++++++++--
 .../local_exchange/local_exchange_source_operator.h     |  1 +
 be/src/pipeline/local_exchange/local_exchanger.h        |  4 ++++
 6 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index b8c55c7c68d..8cf025274af 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -184,6 +184,13 @@ void 
LocalExchangeSharedState::sub_running_sink_operators() {
     }
 }
 
+void LocalExchangeSharedState::sub_running_source_operators() {
+    std::unique_lock<std::mutex> lc(le_lock);
+    if (exchanger->_running_source_operators.fetch_sub(1) == 1) {
+        _set_always_ready();
+    }
+}
+
 LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
     source_deps.resize(num_instances, nullptr);
     mem_trackers.resize(num_instances, nullptr);
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 580cb8368c8..cdc0eec3933 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -759,11 +759,16 @@ public:
         }
     };
     void sub_running_sink_operators();
+    void sub_running_source_operators();
     void _set_always_ready() {
         for (auto& dep : source_deps) {
             DCHECK(dep);
             dep->set_always_ready();
         }
+        for (auto& dep : sink_deps) {
+            DCHECK(dep);
+            dep->set_always_ready();
+        }
     }
 
     Dependency* get_dep_by_channel_id(int channel_id) { return 
source_deps[channel_id].get(); }
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 043b66b2e08..39ebbe27d88 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -65,10 +65,11 @@ std::string LocalExchangeSinkLocalState::debug_string(int 
indentation_level) con
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
                    "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}, "
-                   "_running_sink_operators: {}, _release_count: {}",
+                   "_running_sink_operators: {}, _running_source_operators: 
{}, _release_count: {}",
                    Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
                    _exchanger->_num_senders, _exchanger->_num_sources,
-                   _exchanger->_running_sink_operators, _release_count);
+                   _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators,
+                   _release_count);
     return fmt::to_string(debug_string_buffer);
 }
 
@@ -79,6 +80,12 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block*
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, 
local_state));
 
+    // If all exchange sources ended due to limit reached, current task should 
also finish
+    if (local_state._exchanger->_running_source_operators == 0) {
+        local_state._release_count = true;
+        local_state._shared_state->sub_running_sink_operators();
+        return Status::EndOfFile("receiver eof");
+    }
     if (eos) {
         local_state._shared_state->sub_running_sink_operators();
         local_state._release_count = true;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 1ee48e54a44..ea9f7861dbf 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -47,13 +47,26 @@ Status LocalExchangeSourceLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
+Status LocalExchangeSourceLocalState::close(RuntimeState* state) {
+    if (_closed) {
+        return Status::OK();
+    }
+
+    if (_shared_state) {
+        _shared_state->sub_running_source_operators();
+    }
+
+    return Base::close(state);
+}
+
 std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) 
const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
-                   "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}",
+                   "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}, "
+                   "_running_sink_operators: {}, _running_source_operators: 
{}",
                    Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
                    _exchanger->_num_senders, _exchanger->_num_sources,
-                   _exchanger->_running_sink_operators);
+                   _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators);
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index 9413b60d03b..ec662178dea 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -36,6 +36,7 @@ public:
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     Status open(RuntimeState* state) override;
+    Status close(RuntimeState* state) override;
     std::string debug_string(int indentation_level) const override;
 
 private:
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 1a47a1f36f5..476f479e11e 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -30,12 +30,14 @@ class Exchanger {
 public:
     Exchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
             : _running_sink_operators(running_sink_operators),
+              _running_source_operators(num_partitions),
               _num_partitions(num_partitions),
               _num_senders(running_sink_operators),
               _num_sources(num_partitions),
               _free_block_limit(free_block_limit) {}
     Exchanger(int running_sink_operators, int num_sources, int num_partitions, 
int free_block_limit)
             : _running_sink_operators(running_sink_operators),
+              _running_source_operators(num_partitions),
               _num_partitions(num_partitions),
               _num_senders(running_sink_operators),
               _num_sources(num_sources),
@@ -51,8 +53,10 @@ protected:
     friend struct LocalExchangeSharedState;
     friend struct ShuffleBlockWrapper;
     friend class LocalExchangeSourceLocalState;
+    friend class LocalExchangeSinkOperatorX;
     friend class LocalExchangeSinkLocalState;
     std::atomic<int> _running_sink_operators = 0;
+    std::atomic<int> _running_source_operators = 0;
     const int _num_partitions;
     const int _num_senders;
     const int _num_sources;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to