This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 4bb0e9edce2 Fix exchange operator can not aware end of file #25562
(#26241)
4bb0e9edce2 is described below
commit 4bb0e9edce25c731973eaa88c6457abe2b0e1b04
Author: wangbo <[email protected]>
AuthorDate: Wed Nov 1 22:00:56 2023 +0800
Fix exchange operator can not aware end of file #25562 (#26241)
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 +++
be/src/pipeline/exec/operator.h | 7 +------
2 files changed, 4 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 9407e24dbcd..696e0643880 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -124,6 +124,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
return Status::OK();
}
TUniqueId ins_id = request.channel->_fragment_instance_id;
+ if (_is_receiver_eof(ins_id.lo)) {
+ return Status::EndOfFile("receiver eof");
+ }
bool send_now = false;
{
std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 38ed45ed89f..d98b4f16bd4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -276,12 +276,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
if (in_block->rows() > 0) {
- auto st = _sink->send(state, in_block, source_state ==
SourceState::FINISHED);
- // TODO: improvement: if sink returned END_OF_FILE, pipeline task
can be finished
- if (st.template is<ErrorCode::END_OF_FILE>()) {
- return Status::OK();
- }
- return st;
+ return _sink->send(state, in_block, source_state ==
SourceState::FINISHED);
}
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]