This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 261c65f72da [fix](pipeline) only sub_running_sink_operators in close 
#43500 (#43726)
261c65f72da is described below

commit 261c65f72da1967ce78b981119d1ec9426b4a738
Author: Mryange <[email protected]>
AuthorDate: Sat Nov 16 20:58:47 2024 +0800

    [fix](pipeline) only sub_running_sink_operators in close #43500 (#43726)
    
    https://github.com/apache/doris/pull/43500
    ### What problem does this PR solve?
    Previously, sub_running_sink_operators was called only when encountering
    EOS during sink
    or when all sources were closed. However, this approach has issues, as
    it’s possible
    for the user to manually cancel, in which case there may be no EOS and
    the sources may
    not be closed. This would prevent running_sink_operators from reaching
    zero, leading to errors.
    ```
    PipelineTask[this = 0x7fc369fe9600, id = 0, open = true, eos = false, 
finish = false, dry run = false, elapse time = 26361.740784032s], block 
dependency = NULL, is running = true
    operators:
    LOCAL_EXCHANGE_OPERATOR (LOCAL_MERGE_SORT): id=-5, parallel_tasks=4, 
_channel_id: 0, _num_partitions: 4, _num_senders: 4, _num_sources: 4, 
_running_sink_operators: 1, _running_source_operators: 1, mem_usage: 0, data 
queue info: Data Queue 0: [size approx = 0, eos = false], MemTrackers: 0: 0, 1: 
34537472, 2: 5701632, 3: 0,
      DATA_STREAM_SINK_OPERATOR: id=6, Sink Buffer: (_should_stop = false, 
_busy_channels = 0, _is_finishing = false), _reach_limit: false
    0.   this=0x7fc376438f10, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, 
block task = 0, ready=true, _always_ready=true
    0.   this=0x7fc3764bc110, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, 
block task = 0, ready=true, _always_ready=true
    0.   this=0x7fc3764bc310, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, 
block task = 0, ready=true, _always_ready=true
    0.   this=0x7fc3764bc510, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, 
block task = 0, ready=true, _always_ready=true
    ```
    - [x] Confirm test cases
    - [x] Confirm document
    - [x] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../local_exchange/local_exchange_sink_operator.cpp      | 16 ++++++++++++----
 .../local_exchange/local_exchange_sink_operator.h        |  2 ++
 2 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index 473e6625ee2..2f3c84c25e2 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -52,6 +52,18 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
+Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_close_timer);
+    if (Base::_closed) {
+        return Status::OK();
+    }
+    if (_shared_state) {
+        _shared_state->sub_running_sink_operators();
+    }
+    return Base::close(state, exec_status);
+}
+
 std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) 
const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
@@ -109,12 +121,8 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
     // If all exchange sources ended due to limit reached, current task should 
also finish
     if (local_state._exchanger->_running_source_operators == 0) {
-        local_state._shared_state->sub_running_sink_operators();
         return Status::EndOfFile("receiver eof");
     }
-    if (eos) {
-        local_state._shared_state->sub_running_sink_operators();
-    }
 
     return Status::OK();
 }
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 737fb90541d..33c0255c2b6 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -40,6 +40,8 @@ public:
     Status open(RuntimeState* state) override;
     std::string debug_string(int indentation_level) const override;
 
+    Status close(RuntimeState* state, Status exec_status) override;
+
 private:
     friend class LocalExchangeSinkOperatorX;
     friend class ShuffleExchanger;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to