This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5a4d51716c [fix](pipelineX) fix null ptr when unionoperator only have
constexpr #24822
5a4d51716c is described below
commit 5a4d51716cace641f3a554f75aba3c8d95be3485
Author: Mryange <[email protected]>
AuthorDate: Sat Sep 23 20:28:38 2023 +0800
[fix](pipelineX) fix null ptr when unionoperator only have constexpr #24822
---
be/src/pipeline/exec/union_source_operator.cpp | 4 +++-
be/src/pipeline/exec/union_source_operator.h | 7 +++++++
be/src/pipeline/pipeline_x/operator.cpp | 12 ++++++------
3 files changed, 16 insertions(+), 7 deletions(-)
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index 3ef5fef674..3c86fb87fb 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -152,7 +152,9 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* b
}
local_state.reached_limit(block, source_state);
//have exectue const expr, queue have no data any more, and child could be
colsed
- if ((!_has_data(state) &&
local_state._shared_state->data_queue->is_all_finish())) {
+ if (_child_size == 0) {
+ source_state = SourceState::FINISHED;
+ } else if ((!_has_data(state) &&
local_state._shared_state->data_queue->is_all_finish())) {
source_state = SourceState::FINISHED;
} else if (_has_data(state)) {
source_state = SourceState::MORE_DATA;
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 59212c5a65..acd8419575 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -95,6 +95,9 @@ public:
: Base(pool, tnode, descs), _child_size(tnode.num_children) {};
~UnionSourceOperatorX() override = default;
Dependency* wait_for_dependency(RuntimeState* state) override {
+ if (_child_size == 0) {
+ return nullptr;
+ }
CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
return local_state._dependency->read_blocked_by();
}
@@ -133,9 +136,13 @@ public:
}
return Status::OK();
}
+ int get_child_count() const { return _child_size; }
private:
bool _has_data(RuntimeState* state) {
+ if (_child_size == 0) {
+ return false;
+ }
auto& local_state =
state->get_local_state(id())->cast<UnionSourceLocalState>();
return local_state._shared_state->data_queue->remaining_has_data();
}
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 42c5842d0f..8d228623c5 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -307,9 +307,6 @@ Status
OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt
template <typename LocalStateType>
Status OperatorX<LocalStateType>::setup_local_states(RuntimeState* state,
std::vector<LocalStateInfo>& infos) {
- if (infos.size() > 1) {
- LOG_WARNING("herr");
- }
DCHECK(infos.size() == 1) << infos.size();
for (auto& info : infos) {
RETURN_IF_ERROR(setup_local_state(state, info));
@@ -320,15 +317,18 @@ Status
OperatorX<LocalStateType>::setup_local_states(RuntimeState* state,
template <>
Status OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState*
state,
std::vector<LocalStateInfo>& infos) {
+ int child_count =
static_cast<pipeline::UnionSourceOperatorX*>(this)->get_child_count();
std::shared_ptr<DataQueue> data_queue;
for (auto& info : infos) {
auto local_state = UnionSourceLocalState::create_shared(state, this);
state->emplace_local_state(id(), local_state);
RETURN_IF_ERROR(local_state->init(state, info));
- if (!data_queue) {
- data_queue = local_state->data_queue();
+ if (child_count != 0) {
+ if (!data_queue) {
+ data_queue = local_state->data_queue();
+ }
+ local_state->_shared_state->data_queue = data_queue;
}
- local_state->_shared_state->data_queue = data_queue;
}
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]