This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 99810f1ea5 [Bug](pipeline) fix hang on union_source_operator when
child sink_operator all finished (#20938)
99810f1ea5 is described below
commit 99810f1ea57e2072ced11c08938cf2dfe331ef94
Author: Pxl <[email protected]>
AuthorDate: Mon Jun 19 09:46:38 2023 +0800
[Bug](pipeline) fix hang on union_source_operator when child sink_operator
all finished (#20938)
---
be/src/pipeline/exec/union_source_operator.cpp | 10 +++++++---
be/src/pipeline/exec/union_source_operator.h | 2 ++
2 files changed, 9 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index 6efd0bfc78..f39e0a582b 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -46,10 +46,14 @@
UnionSourceOperator::UnionSourceOperator(OperatorBuilderBase* operator_builder,
_data_queue(queue),
_need_read_for_const_expr(true) {};
+bool UnionSourceOperator::_has_data() {
+ return _need_read_for_const_expr || _data_queue->remaining_has_data();
+}
+
// we assumed it can read to process const expr, Although we don't know
whether there is
// ,and queue have data, could read also
bool UnionSourceOperator::can_read() {
- return _need_read_for_const_expr || _data_queue->remaining_has_data();
+ return _has_data() || _data_queue->is_all_finish();
}
Status UnionSourceOperator::pull_data(RuntimeState* state, vectorized::Block*
block, bool* eos) {
@@ -83,9 +87,9 @@ Status UnionSourceOperator::get_block(RuntimeState* state,
vectorized::Block* bl
std::bind(&UnionSourceOperator::pull_data, this,
std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3)));
//have exectue const expr, queue have no data any more, and child could be
colsed
- if (eos || (!can_read() && _data_queue->is_all_finish())) {
+ if (eos || (!_has_data() && _data_queue->is_all_finish())) {
source_state = SourceState::FINISHED;
- } else if (can_read()) {
+ } else if (_has_data()) {
source_state = SourceState::MORE_DATA;
} else {
source_state = SourceState::DEPEND_ON_SOURCE;
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 33a6bef587..8051bdd512 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -59,6 +59,8 @@ public:
Status pull_data(RuntimeState* state, vectorized::Block* output_block,
bool* eos);
private:
+ bool _has_data();
+
std::shared_ptr<DataQueue> _data_queue;
bool _need_read_for_const_expr;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]