This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a803e1493ac [pipeline](fix) Set upstream operators always runnable 
once source op… (#37325)
a803e1493ac is described below

commit a803e1493ac04078e3aa64a9e2ee36097c3173a3
Author: Gabriel <[email protected]>
AuthorDate: Fri Jul 5 13:54:34 2024 +0800

    [pipeline](fix) Set upstream operators always runnable once source op… 
(#37325)
    
    …erator closed (#37297)
    
    Some kinds of source operators has a 1-1 relationship with a sink
    operator (such as AnalyticOperator). We must ensure AnalyticSinkOperator
    will not be blocked if AnalyticSourceOperator already closed.
    
    pick #37297
---
 be/src/pipeline/pipeline_x/operator.cpp | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index a7b3e4ad340..7e047851d40 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -459,6 +459,11 @@ Status 
PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
         _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
     }
     _closed = true;
+    // Some kinds of source operators has a 1-1 relationship with a sink 
operator (such as AnalyticOperator).
+    // We must ensure AnalyticSinkOperator will not be blocked if 
AnalyticSourceOperator already closed.
+    if (_shared_state && _shared_state->sink_deps.size() == 1) {
+        _shared_state->sink_deps.front()->set_always_ready();
+    }
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to