This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a6925cc0cfb Fix exchange operator can not aware end of file (#25562)
a6925cc0cfb is described below
commit a6925cc0cfb1ddc31ef5a20b0fb9059fbdb2ab15
Author: wangbo <[email protected]>
AuthorDate: Fri Oct 20 18:56:01 2023 +0800
Fix exchange operator can not aware end of file (#25562)
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 +++
be/src/pipeline/exec/operator.h | 7 +------
2 files changed, 4 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 31a8a1852a7..b3d2222e503 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -148,6 +148,9 @@ Status
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
return Status::OK();
}
TUniqueId ins_id = request.channel->_fragment_instance_id;
+ if (_is_receiver_eof(ins_id.lo)) {
+ return Status::EndOfFile("receiver eof");
+ }
bool send_now = false;
{
std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 4ba2aec977f..125f8fd89e4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -295,12 +295,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
- auto st = _sink->sink(state, in_block, source_state ==
SourceState::FINISHED);
- // TODO: improvement: if sink returned END_OF_FILE, pipeline task
can be finished
- if (st.template is<ErrorCode::END_OF_FILE>()) {
- return Status::OK();
- }
- return st;
+ return _sink->sink(state, in_block, source_state ==
SourceState::FINISHED);
}
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]