This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ae1fb80ac32 [fix](pipeline) incorrect result caused by missing output 
block of union operator (#29677)
ae1fb80ac32 is described below

commit ae1fb80ac32cc5130cfe5f2b3cc3c67b5251b875
Author: Jerry Hu <[email protected]>
AuthorDate: Tue Jan 9 09:42:42 2024 +0800

    [fix](pipeline) incorrect result caused by missing output block of union 
operator (#29677)
---
 be/src/pipeline/exec/union_source_operator.cpp | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index b42f941d608..e8ef1ba7207 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -88,11 +88,15 @@ Status UnionSourceOperator::get_block(RuntimeState* state, 
vectorized::Block* bl
             state, block, &eos,
             std::bind(&UnionSourceOperator::pull_data, this, 
std::placeholders::_1,
                       std::placeholders::_2, std::placeholders::_3)));
-    //have exectue const expr, queue have no data any more, and child could be 
colsed
-    if (eos || (!_has_data() && _data_queue->is_all_finish())) {
+    //have executing const expr, queue have no data anymore, and child could 
be closed.
+    if (eos) { // reach limit
         source_state = SourceState::FINISHED;
     } else if (_has_data()) {
         source_state = SourceState::MORE_DATA;
+    } else if (_data_queue->is_all_finish()) {
+        // Here, check the value of `_has_data(state)` again after 
`data_queue.is_all_finish()` is TRUE
+        // as there may be one or more blocks when 
`data_queue.is_all_finish()` is TRUE.
+        source_state = _has_data() ? SourceState::MORE_DATA : 
SourceState::FINISHED;
     } else {
         source_state = SourceState::DEPEND_ON_SOURCE;
     }
@@ -185,13 +189,15 @@ Status UnionSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block* b
         
local_state._shared_state->data_queue.push_free_block(std::move(output_block), 
child_idx);
     }
     local_state.reached_limit(block, source_state);
-    //have exectue const expr, queue have no data any more, and child could be 
colsed
+    //have executing const expr, queue have no data anymore, and child could 
be closed
     if (_child_size == 0 && !local_state._need_read_for_const_expr) {
         source_state = SourceState::FINISHED;
-    } else if ((!_has_data(state) && 
local_state._shared_state->data_queue.is_all_finish())) {
-        source_state = SourceState::FINISHED;
     } else if (_has_data(state)) {
         source_state = SourceState::MORE_DATA;
+    } else if (local_state._shared_state->data_queue.is_all_finish()) {
+        // Here, check the value of `_has_data(state)` again after 
`data_queue.is_all_finish()` is TRUE
+        // as there may be one or more blocks when 
`data_queue.is_all_finish()` is TRUE.
+        source_state = _has_data(state) ? SourceState::MORE_DATA : 
SourceState::FINISHED;
     } else {
         source_state = SourceState::DEPEND_ON_SOURCE;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to