This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 4bb0e9edce2 Fix exchange operator can not aware end of file #25562  
(#26241)
4bb0e9edce2 is described below

commit 4bb0e9edce25c731973eaa88c6457abe2b0e1b04
Author: wangbo <[email protected]>
AuthorDate: Wed Nov 1 22:00:56 2023 +0800

    Fix exchange operator can not aware end of file #25562  (#26241)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 +++
 be/src/pipeline/exec/operator.h               | 7 +------
 2 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 9407e24dbcd..696e0643880 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -124,6 +124,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
         return Status::OK();
     }
     TUniqueId ins_id = request.channel->_fragment_instance_id;
+    if (_is_receiver_eof(ins_id.lo)) {
+        return Status::EndOfFile("receiver eof");
+    }
     bool send_now = false;
     {
         std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 38ed45ed89f..d98b4f16bd4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -276,12 +276,7 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override {
         if (in_block->rows() > 0) {
-            auto st = _sink->send(state, in_block, source_state == 
SourceState::FINISHED);
-            // TODO: improvement: if sink returned END_OF_FILE, pipeline task 
can be finished
-            if (st.template is<ErrorCode::END_OF_FILE>()) {
-                return Status::OK();
-            }
-            return st;
+            return _sink->send(state, in_block, source_state == 
SourceState::FINISHED);
         }
         return Status::OK();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to