This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a4d51716c [fix](pipelineX) fix null ptr when unionoperator only have 
constexpr #24822
5a4d51716c is described below

commit 5a4d51716cace641f3a554f75aba3c8d95be3485
Author: Mryange <[email protected]>
AuthorDate: Sat Sep 23 20:28:38 2023 +0800

    [fix](pipelineX) fix null ptr when unionoperator only have constexpr #24822
---
 be/src/pipeline/exec/union_source_operator.cpp |  4 +++-
 be/src/pipeline/exec/union_source_operator.h   |  7 +++++++
 be/src/pipeline/pipeline_x/operator.cpp        | 12 ++++++------
 3 files changed, 16 insertions(+), 7 deletions(-)

diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index 3ef5fef674..3c86fb87fb 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -152,7 +152,9 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* b
     }
     local_state.reached_limit(block, source_state);
     //have exectue const expr, queue have no data any more, and child could be 
colsed
-    if ((!_has_data(state) && 
local_state._shared_state->data_queue->is_all_finish())) {
+    if (_child_size == 0) {
+        source_state = SourceState::FINISHED;
+    } else if ((!_has_data(state) && 
local_state._shared_state->data_queue->is_all_finish())) {
         source_state = SourceState::FINISHED;
     } else if (_has_data(state)) {
         source_state = SourceState::MORE_DATA;
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 59212c5a65..acd8419575 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -95,6 +95,9 @@ public:
             : Base(pool, tnode, descs), _child_size(tnode.num_children) {};
     ~UnionSourceOperatorX() override = default;
     Dependency* wait_for_dependency(RuntimeState* state) override {
+        if (_child_size == 0) {
+            return nullptr;
+        }
         CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
         return local_state._dependency->read_blocked_by();
     }
@@ -133,9 +136,13 @@ public:
         }
         return Status::OK();
     }
+    int get_child_count() const { return _child_size; }
 
 private:
     bool _has_data(RuntimeState* state) {
+        if (_child_size == 0) {
+            return false;
+        }
         auto& local_state = 
state->get_local_state(id())->cast<UnionSourceLocalState>();
         return local_state._shared_state->data_queue->remaining_has_data();
     }
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 42c5842d0f..8d228623c5 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -307,9 +307,6 @@ Status 
OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt
 template <typename LocalStateType>
 Status OperatorX<LocalStateType>::setup_local_states(RuntimeState* state,
                                                      
std::vector<LocalStateInfo>& infos) {
-    if (infos.size() > 1) {
-        LOG_WARNING("herr");
-    }
     DCHECK(infos.size() == 1) << infos.size();
     for (auto& info : infos) {
         RETURN_IF_ERROR(setup_local_state(state, info));
@@ -320,15 +317,18 @@ Status 
OperatorX<LocalStateType>::setup_local_states(RuntimeState* state,
 template <>
 Status OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* 
state,
                                                             
std::vector<LocalStateInfo>& infos) {
+    int child_count = 
static_cast<pipeline::UnionSourceOperatorX*>(this)->get_child_count();
     std::shared_ptr<DataQueue> data_queue;
     for (auto& info : infos) {
         auto local_state = UnionSourceLocalState::create_shared(state, this);
         state->emplace_local_state(id(), local_state);
         RETURN_IF_ERROR(local_state->init(state, info));
-        if (!data_queue) {
-            data_queue = local_state->data_queue();
+        if (child_count != 0) {
+            if (!data_queue) {
+                data_queue = local_state->data_queue();
+            }
+            local_state->_shared_state->data_queue = data_queue;
         }
-        local_state->_shared_state->data_queue = data_queue;
     }
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to