This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 258b1d3a847 [fix](pipeline) only sub_running_sink_operators in close
(#43500)
258b1d3a847 is described below
commit 258b1d3a847d0ab47ef458214e1c65f7db677b64
Author: Mryange <[email protected]>
AuthorDate: Mon Nov 11 11:15:19 2024 +0800
[fix](pipeline) only sub_running_sink_operators in close (#43500)
Previously, sub_running_sink_operators was called only when encountering
EOS during sink
or when all sources were closed. However, this approach has issues, as
it’s possible
for the user to manually cancel, in which case there may be no EOS and
the sources may
not be closed. This would prevent running_sink_operators from reaching
zero, leading to errors.
```
PipelineTask[this = 0x7fc369fe9600, id = 0, open = true, eos = false,
finish = false, dry run = false, elapse time = 26361.740784032s], block
dependency = NULL, is running = true
operators:
LOCAL_EXCHANGE_OPERATOR (LOCAL_MERGE_SORT): id=-5, parallel_tasks=4,
_channel_id: 0, _num_partitions: 4, _num_senders: 4, _num_sources: 4,
_running_sink_operators: 1, _running_source_operators: 1, mem_usage: 0, data
queue info: Data Queue 0: [size approx = 0, eos = false], MemTrackers: 0: 0, 1:
34537472, 2: 5701632, 3: 0,
DATA_STREAM_SINK_OPERATOR: id=6, Sink Buffer: (_should_stop = false,
_busy_channels = 0, _is_finishing = false), _reach_limit: false
0. this=0x7fc376438f10, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5,
block task = 0, ready=true, _always_ready=true
0. this=0x7fc3764bc110, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5,
block task = 0, ready=true, _always_ready=true
0. this=0x7fc3764bc310, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5,
block task = 0, ready=true, _always_ready=true
0. this=0x7fc3764bc510, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5,
block task = 0, ready=true, _always_ready=true
```
---
.../local_exchange/local_exchange_sink_operator.cpp | 16 ++++++++++++----
.../local_exchange/local_exchange_sink_operator.h | 1 +
2 files changed, 13 insertions(+), 4 deletions(-)
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index ff243186c47..a939d25654b 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -110,6 +110,18 @@ Status LocalExchangeSinkLocalState::open(RuntimeState*
state) {
return Status::OK();
}
+Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status
exec_status) {
+ SCOPED_TIMER(Base::exec_time_counter());
+ SCOPED_TIMER(Base::_close_timer);
+ if (Base::_closed) {
+ return Status::OK();
+ }
+ if (_shared_state) {
+ _shared_state->sub_running_sink_operators();
+ }
+ return Base::close(state, exec_status);
+}
+
std::string LocalExchangeSinkLocalState::debug_string(int indentation_level)
const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
@@ -132,12 +144,8 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
// If all exchange sources ended due to limit reached, current task should
also finish
if (local_state._exchanger->_running_source_operators == 0) {
- local_state._shared_state->sub_running_sink_operators();
return Status::EndOfFile("receiver eof");
}
- if (eos) {
- local_state._shared_state->sub_running_sink_operators();
- }
return Status::OK();
}
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 09b1f2cc310..4c4a400c2bd 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -45,6 +45,7 @@ public:
Status open(RuntimeState* state) override;
std::string debug_string(int indentation_level) const override;
std::vector<Dependency*> dependencies() const override;
+ Status close(RuntimeState* state, Status exec_status) override;
private:
friend class LocalExchangeSinkOperatorX;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]