This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e48a033338 [Bug](pipeline) Support projection in UnionSourceOperator 
(#16525)
e48a033338 is described below

commit e48a033338a85101686cb05a7688af9738780d58
Author: Gabriel <[email protected]>
AuthorDate: Thu Feb 9 14:43:44 2023 +0800

    [Bug](pipeline) Support projection in UnionSourceOperator (#16525)
---
 be/src/pipeline/exec/union_source_operator.cpp     | 24 ++++++++++++++--------
 be/src/pipeline/exec/union_source_operator.h       |  2 ++
 .../suites/query_p0/join/test_join.groovy          |  3 +--
 3 files changed, 19 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index d6f0fcf399..b65e7b3f51 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -49,14 +49,13 @@ bool UnionSourceOperator::can_read() {
     return _need_read_for_const_expr || _data_queue->remaining_has_data();
 }
 
-Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* 
block,
-                                      SourceState& source_state) {
+Status UnionSourceOperator::pull_data(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     // here we precess const expr firstly
     if (_need_read_for_const_expr) {
-        if (this->_node->has_more_const(state)) {
-            this->_node->get_next_const(state, block);
+        if (_node->has_more_const(state)) {
+            _node->get_next_const(state, block);
         }
-        _need_read_for_const_expr = this->_node->has_more_const(state);
+        _need_read_for_const_expr = _node->has_more_const(state);
     } else {
         std::unique_ptr<vectorized::Block> output_block;
         int child_idx = 0;
@@ -69,12 +68,21 @@ Status UnionSourceOperator::get_block(RuntimeState* state, 
vectorized::Block* bl
         _data_queue->push_free_block(std::move(output_block), child_idx);
     }
 
-    bool reached_limit = false;
-    this->_node->reached_limit(block, &reached_limit);
+    _node->reached_limit(block, eos);
+    return Status::OK();
+}
+
+Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* 
block,
+                                      SourceState& source_state) {
+    bool eos = false;
+    RETURN_IF_ERROR(_node->get_next_after_projects(
+            state, block, &eos,
+            std::bind(&UnionSourceOperator::pull_data, this, 
std::placeholders::_1,
+                      std::placeholders::_2, std::placeholders::_3)));
     //have exectue const expr, queue have no data any more, and child could be 
colsed
     source_state = ((!_need_read_for_const_expr && 
!_data_queue->remaining_has_data() &&
                      _data_queue->is_all_finish()) ||
-                    reached_limit)
+                    eos)
                            ? SourceState::FINISHED
                            : SourceState::DEPEND_ON_SOURCE;
     return Status::OK();
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 53e92e9ab7..d330b45a82 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -47,6 +47,8 @@ public:
                      SourceState& source_state) override;
     bool can_read() override;
 
+    Status pull_data(RuntimeState* state, vectorized::Block* output_block, 
bool* eos);
+
 private:
     std::shared_ptr<DataQueue> _data_queue;
     bool _need_read_for_const_expr;
diff --git a/regression-test/suites/query_p0/join/test_join.groovy 
b/regression-test/suites/query_p0/join/test_join.groovy
index ff8a694f43..abacdc9270 100644
--- a/regression-test/suites/query_p0/join/test_join.groovy
+++ b/regression-test/suites/query_p0/join/test_join.groovy
@@ -804,8 +804,7 @@ suite("test_join", "query,p0") {
 
     qt_right_anti_join_null_1 "select b.k1 from ${tbName1} t right anti join 
${tbName2} b on b.k1 > t.k1 order by b.k1"
 
-    qt_right_anti_join_null_2 "select/*+SET_VAR(batch_size=3) */
- b.k1 from ${empty_name} t right anti join ${tbName2} b on b.k1 > t.k1 order 
by b.k1"
+    qt_right_anti_join_null_2 "select /*+SET_VAR(batch_size=3) */ b.k1 from 
${empty_name} t right anti join ${tbName2} b on b.k1 > t.k1 order by b.k1"
 
     // join with no join keyword
     for (s in selected){


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to