This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 07dd6830e8 [pipelineX](refactor) add union node in pipelineX (#24286)
07dd6830e8 is described below
commit 07dd6830e82a0c9b39d6efc06d1aeb25900a843b
Author: Mryange <[email protected]>
AuthorDate: Wed Sep 13 20:39:58 2023 +0800
[pipelineX](refactor) add union node in pipelineX (#24286)
---
be/src/pipeline/exec/union_sink_operator.cpp | 90 ++++++++++++++++++
be/src/pipeline/exec/union_sink_operator.h | 103 ++++++++++++++++++++
be/src/pipeline/exec/union_source_operator.cpp | 35 +++++++
be/src/pipeline/exec/union_source_operator.h | 41 ++++++++
be/src/pipeline/pipeline_x/dependency.h | 15 +++
be/src/pipeline/pipeline_x/operator.cpp | 8 +-
be/src/pipeline/pipeline_x/operator.h | 17 +++-
.../pipeline_x/pipeline_x_fragment_context.cpp | 33 ++++++-
.../pipeline_x/pipeline_x_fragment_context.h | 1 +
be/src/pipeline/pipeline_x/pipeline_x_task.h | 6 +-
.../data/pipelineX/test_union_operator.out | 13 +++
.../suites/pipelineX/test_union_operator.groovy | 105 +++++++++++++++++++++
12 files changed, 459 insertions(+), 8 deletions(-)
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp
b/be/src/pipeline/exec/union_sink_operator.cpp
index 9296df78d9..c1fd75d820 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -94,4 +94,94 @@ Status UnionSinkOperator::close(RuntimeState* state) {
return StreamingOperator::close(state);
}
+Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ auto& p = _parent->cast<Parent>();
+ _child_expr.resize(p._child_expr.size());
+ for (size_t i = 0; i < p._child_expr.size(); i++) {
+ RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
+ }
+ return Status::OK();
+};
+
+UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int sink_id, ObjectPool*
pool,
+ const TPlanNode& tnode, const
DescriptorTbl& descs)
+ : Base(sink_id, tnode.node_id),
+
_first_materialized_child_idx(tnode.union_node.first_materialized_child_idx),
+ _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
+ _cur_child_id(child_id),
+ _child_size(tnode.num_children) {}
+
+Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
+ DCHECK(tnode.__isset.union_node);
+ {
+ // Create result_expr_ctx_lists_ from thrift exprs.
+ auto& result_texpr_lists = tnode.union_node.result_expr_lists;
+ auto& texprs = result_texpr_lists[_cur_child_id];
+ vectorized::VExprContextSPtrs ctxs;
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+ _child_expr = ctxs;
+ }
+ return Status::OK();
+}
+
+Status UnionSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state,
_child_x->row_desc()));
+ return Status::OK();
+}
+
+Status UnionSinkOperatorX::open(RuntimeState* state) {
+ // open const expr lists.
+ RETURN_IF_ERROR(vectorized::VExpr::open(_const_expr, state));
+
+ // open result expr lists.
+ RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+
+ return Status::OK();
+}
+
+Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
in_block,
+ SourceState source_state) {
+ auto& local_state =
state->get_sink_local_state(id())->cast<UnionSinkLocalState>();
+ if (local_state._output_block == nullptr) {
+ local_state._output_block =
+
local_state._shared_state->_data_queue->get_free_block(_cur_child_id);
+ }
+ if (_cur_child_id < _get_first_materialized_child_idx()) { //pass_through
+ if (in_block->rows() > 0) {
+ local_state._output_block->swap(*in_block);
+
local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block),
+ _cur_child_id);
+ }
+ } else if (_get_first_materialized_child_idx() != children_count() &&
+ _cur_child_id < children_count()) { //need materialized
+ RETURN_IF_ERROR(materialize_child_block(state, _cur_child_id, in_block,
+
local_state._output_block.get()));
+ } else {
+ return Status::InternalError("maybe can't reach here, execute const
expr: {}, {}, {}",
+ _cur_child_id,
_get_first_materialized_child_idx(),
+ children_count());
+ }
+ if (UNLIKELY(source_state == SourceState::FINISHED)) {
+ //if _cur_child_id eos, need check to push block
+ //Now here can't check _output_block rows, even it's row==0, also need
push block
+ //because maybe sink is eos and queue have none data, if not push block
+ //the source can't can_read again and can't set source finished
+ if (local_state._output_block) {
+
local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block),
+ _cur_child_id);
+ }
+
+ local_state._shared_state->_data_queue->set_finish(_cur_child_id);
+ return Status::OK();
+ }
+ // not eos and block rows is enough to output,so push block
+ if (local_state._output_block && (local_state._output_block->rows() >=
state->batch_size())) {
+
local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block),
+ _cur_child_id);
+ }
+ return Status::OK();
+}
+
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/union_sink_operator.h
b/be/src/pipeline/exec/union_sink_operator.h
index 0da75147cc..2bbfab5cfc 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -23,6 +23,7 @@
#include "common/status.h"
#include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
#include "vec/core/block.h"
#include "vec/exec/vunion_node.h"
@@ -64,5 +65,107 @@ private:
std::shared_ptr<DataQueue> _data_queue;
std::unique_ptr<vectorized::Block> _output_block;
};
+
+class UnionSinkOperatorX;
+class UnionSinkLocalState final : public
PipelineXSinkLocalState<UnionDependency> {
+public:
+ ENABLE_FACTORY_CREATOR(UnionSinkLocalState);
+ UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+ : Base(parent, state), _child_row_idx(0) {}
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ friend class UnionSinkOperatorX;
+ using Base = PipelineXSinkLocalState<UnionDependency>;
+ using Parent = UnionSinkOperatorX;
+
+private:
+ std::unique_ptr<vectorized::Block> _output_block;
+
+ /// Const exprs materialized by this node. These exprs don't refer to any
children.
+ /// Only materialized by the first fragment instance to avoid duplication.
+ vectorized::VExprContextSPtrs _const_expr;
+
+ /// Exprs materialized by this node. The i-th result expr list refers to
the i-th child.
+ vectorized::VExprContextSPtrs _child_expr;
+
+ /// Index of current row in child_row_block_.
+ int _child_row_idx;
+};
+
+class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState>
{
+public:
+ using Base = DataSinkOperatorX<UnionSinkLocalState>;
+
+ friend class UnionSinkLocalState;
+ UnionSinkOperatorX(int child_id, int sink_id, ObjectPool* pool, const
TPlanNode& tnode,
+ const DescriptorTbl& descs);
+ ~UnionSinkOperatorX() override = default;
+ Status init(const TDataSink& tsink) override {
+ return Status::InternalError("{} should not init with TDataSink");
+ }
+
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
+
+ Status prepare(RuntimeState* state) override;
+ Status open(RuntimeState* state) override;
+
+ Status sink(RuntimeState* state, vectorized::Block* in_block,
+ SourceState source_state) override;
+
+ bool can_write(RuntimeState* state) override { return true; }
+
+private:
+ int _get_first_materialized_child_idx() const { return
_first_materialized_child_idx; }
+
+ /// Const exprs materialized by this node. These exprs don't refer to any
children.
+ /// Only materialized by the first fragment instance to avoid duplication.
+ vectorized::VExprContextSPtrs _const_expr;
+
+ /// Exprs materialized by this node. The i-th result expr list refers to
the i-th child.
+ vectorized::VExprContextSPtrs _child_expr;
+ /// Index of the first non-passthrough child; i.e. a child that needs
materialization.
+ /// 0 when all children are materialized, '_children.size()' when no
children are
+ /// materialized.
+ const int _first_materialized_child_idx;
+
+ const RowDescriptor _row_descriptor;
+ const int _cur_child_id;
+ const int _child_size;
+ int children_count() const { return _child_size; }
+ bool is_child_passthrough(int child_idx) const {
+ DCHECK_LT(child_idx, _child_size);
+ return child_idx < _first_materialized_child_idx;
+ }
+ Status materialize_child_block(RuntimeState* state, int child_id,
+ vectorized::Block* input_block,
+ vectorized::Block* output_block) {
+ DCHECK_LT(child_id, _child_size);
+ DCHECK(!is_child_passthrough(child_id));
+ if (input_block->rows() > 0) {
+ vectorized::MutableBlock mblock =
+
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
+
_row_descriptor);
+ vectorized::Block res;
+ RETURN_IF_ERROR(materialize_block(state, input_block, child_id,
&res));
+ RETURN_IF_ERROR(mblock.merge(res));
+ }
+ return Status::OK();
+ }
+
+ Status materialize_block(RuntimeState* state, vectorized::Block*
src_block, int child_idx,
+ vectorized::Block* res_block) {
+ auto& local_state =
state->get_sink_local_state(id())->cast<UnionSinkLocalState>();
+ const auto& child_exprs = local_state._child_expr;
+ vectorized::ColumnsWithTypeAndName colunms;
+ for (size_t i = 0; i < child_exprs.size(); ++i) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(child_exprs[i]->execute(src_block,
&result_column_id));
+ colunms.emplace_back(src_block->get_by_position(result_column_id));
+ }
+ local_state._child_row_idx += src_block->rows();
+ *res_block = {colunms};
+ return Status::OK();
+ }
+};
+
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index f39e0a582b..c1fdae4851 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -97,5 +97,40 @@ Status UnionSourceOperator::get_block(RuntimeState* state,
vectorized::Block* bl
return Status::OK();
}
+
+Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ auto& p = _parent->cast<Parent>();
+ std::shared_ptr<DataQueue> data_queue =
std::make_shared<DataQueue>(p._child_size);
+ _shared_state->_data_queue.swap(data_queue);
+ return Status::OK();
+}
+
+Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block,
+ SourceState& source_state) {
+ auto& local_state =
state->get_local_state(id())->cast<UnionSourceLocalState>();
+ bool eos = false;
+ std::unique_ptr<vectorized::Block> output_block =
vectorized::Block::create_unique();
+ int child_idx = 0;
+
local_state._shared_state->_data_queue->get_block_from_queue(&output_block,
&child_idx);
+ if (!output_block) {
+ return Status::OK();
+ }
+ block->swap(*output_block);
+ output_block->clear_column_data(row_desc().num_materialized_slots());
+
local_state._shared_state->_data_queue->push_free_block(std::move(output_block),
child_idx);
+
+ local_state.reached_limit(block, &eos);
+ //have exectue const expr, queue have no data any more, and child could be
colsed
+ if ((!_has_data(state) &&
local_state._shared_state->_data_queue->is_all_finish())) {
+ source_state = SourceState::FINISHED;
+ } else if (_has_data(state)) {
+ source_state = SourceState::MORE_DATA;
+ } else {
+ source_state = SourceState::DEPEND_ON_SOURCE;
+ }
+ return Status::OK();
+}
+
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 8bd2f484f3..580a7d4a15 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -22,6 +22,7 @@
#include "common/status.h"
#include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
#include "vec/exec/vunion_node.h"
namespace doris {
@@ -67,6 +68,46 @@ private:
std::shared_ptr<DataQueue> _data_queue;
bool _need_read_for_const_expr;
};
+class UnionSourceOperatorX;
+class UnionSourceLocalState final : public
PipelineXLocalState<UnionDependency> {
+public:
+ ENABLE_FACTORY_CREATOR(UnionSourceLocalState);
+ using Base = PipelineXLocalState<UnionDependency>;
+ using Parent = UnionSourceOperatorX;
+ UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) :
Base(state, parent) {};
+
+ Status init(RuntimeState* state, LocalStateInfo& info) override;
+ friend class UnionSourceOperatorX;
+ bool _need_read_for_const_expr {false};
+};
+
+class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
+public:
+ using Base = OperatorX<UnionSourceLocalState>;
+ UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
+ : Base(pool, tnode, descs), _child_size(tnode.num_children) {};
+ ~UnionSourceOperatorX() override = default;
+ bool can_read(RuntimeState* state) override {
+ auto& local_state =
state->get_local_state(id())->cast<UnionSourceLocalState>();
+ return local_state._shared_state->_data_queue->is_all_finish();
+ }
+
+ Status get_block(RuntimeState* state, vectorized::Block* block,
+ SourceState& source_state) override;
+
+ bool is_source() const override { return true; }
+
+private:
+ bool _has_data(RuntimeState* state) {
+ auto& local_state =
state->get_local_state(id())->cast<UnionSourceLocalState>();
+ return local_state._shared_state->_data_queue->remaining_has_data();
+ }
+ bool has_more_const(const RuntimeState* state) const {
+ return state->per_fragment_instance_idx() == 0;
+ }
+ friend class UnionSourceLocalState;
+ const int _child_size;
+};
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 597337ddfa..41c9299f01 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -157,6 +157,21 @@ private:
SortSharedState _sort_state;
};
+struct UnionSharedState {
+public:
+ std::shared_ptr<DataQueue> _data_queue;
+};
+
+class UnionDependency final : public Dependency {
+public:
+ using SharedState = UnionSharedState;
+ UnionDependency(int id) : Dependency(id, "UnionDependency") {}
+ ~UnionDependency() override = default;
+ void* shared_state() override { return (void*)&_union_state; };
+
+private:
+ UnionSharedState _union_state;
+};
struct AnalyticSharedState {
public:
AnalyticSharedState() = default;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 14b7fbfd21..328dd43cb0 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -39,6 +39,8 @@
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/exec/streaming_aggregation_sink_operator.h"
#include "pipeline/exec/streaming_aggregation_source_operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/exec/union_source_operator.h"
#include "util/debug_util.h"
namespace doris::pipeline {
@@ -268,7 +270,7 @@ Status
DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state,
template <typename LocalStateType>
void DataSinkOperatorX<LocalStateType>::get_dependency(DependencySPtr&
dependency) {
- dependency.reset(new typename LocalStateType::Dependency(id()));
+ dependency.reset(new typename LocalStateType::Dependency(dest_id()));
}
template <typename LocalStateType>
@@ -327,6 +329,7 @@ DECLARE_OPERATOR_X(BlockingAggSinkLocalState)
DECLARE_OPERATOR_X(StreamingAggSinkLocalState)
DECLARE_OPERATOR_X(ExchangeSinkLocalState)
DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState)
+DECLARE_OPERATOR_X(UnionSinkLocalState)
#undef DECLARE_OPERATOR_X
@@ -341,6 +344,7 @@ DECLARE_OPERATOR_X(RepeatLocalState)
DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState)
DECLARE_OPERATOR_X(AssertNumRowsLocalState)
DECLARE_OPERATOR_X(EmptySetLocalState)
+DECLARE_OPERATOR_X(UnionSourceLocalState)
#undef DECLARE_OPERATOR_X
@@ -357,6 +361,7 @@ template class
PipelineXSinkLocalState<NestedLoopJoinDependency>;
template class PipelineXSinkLocalState<AnalyticDependency>;
template class PipelineXSinkLocalState<AggDependency>;
template class PipelineXSinkLocalState<FakeDependency>;
+template class PipelineXSinkLocalState<UnionDependency>;
template class PipelineXLocalState<HashJoinDependency>;
template class PipelineXLocalState<SortDependency>;
@@ -364,5 +369,6 @@ template class
PipelineXLocalState<NestedLoopJoinDependency>;
template class PipelineXLocalState<AnalyticDependency>;
template class PipelineXLocalState<AggDependency>;
template class PipelineXLocalState<FakeDependency>;
+template class PipelineXLocalState<UnionDependency>;
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 12b3d371a4..e8a129795b 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -398,7 +398,10 @@ protected:
class DataSinkOperatorXBase : public OperatorBase {
public:
- DataSinkOperatorXBase(const int id) : OperatorBase(nullptr), _id(id) {}
+ DataSinkOperatorXBase(const int id) : OperatorBase(nullptr), _id(id),
_dest_id(id) {}
+
+ DataSinkOperatorXBase(const int id, const int dest_id)
+ : OperatorBase(nullptr), _id(id), _dest_id(dest_id) {}
virtual ~DataSinkOperatorXBase() override = default;
@@ -465,6 +468,8 @@ public:
[[nodiscard]] int id() const override { return _id; }
+ [[nodiscard]] int dest_id() const { return _dest_id; }
+
[[nodiscard]] std::string get_name() const override { return _name; }
Status finalize(RuntimeState* state) override { return Status::OK(); }
@@ -473,6 +478,7 @@ public:
protected:
const int _id;
+ const int _dest_id;
std::string _name;
// Maybe this will be transferred to BufferControlBlock.
@@ -488,7 +494,8 @@ class DataSinkOperatorX : public DataSinkOperatorXBase {
public:
DataSinkOperatorX(const int id) : DataSinkOperatorXBase(id) {}
- virtual ~DataSinkOperatorX() override = default;
+ DataSinkOperatorX(const int id, const int source_id) :
DataSinkOperatorXBase(id, source_id) {}
+ ~DataSinkOperatorX() override = default;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override;
@@ -501,7 +508,7 @@ public:
using Dependency = DependencyType;
PipelineXSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalStateBase(parent, state) {}
- virtual ~PipelineXSinkLocalState() {}
+ ~PipelineXSinkLocalState() override = default;
virtual Status init(RuntimeState* state, LocalSinkStateInfo& info)
override {
_dependency = (DependencyType*)info.dependency;
@@ -514,7 +521,7 @@ public:
return Status::OK();
}
- virtual Status close(RuntimeState* state) override {
+ Status close(RuntimeState* state) override {
if (_closed) {
return Status::OK();
}
@@ -522,7 +529,7 @@ public:
return Status::OK();
}
- virtual std::string debug_string(int indentation_level) const override;
+ std::string debug_string(int indentation_level) const override;
protected:
DependencyType* _dependency;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 8faa2a76b8..03af3aabdd 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -64,6 +64,8 @@
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/exec/streaming_aggregation_sink_operator.h"
#include "pipeline/exec/streaming_aggregation_source_operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/exec/union_source_operator.h"
#include "pipeline/task_scheduler.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
@@ -296,7 +298,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
_runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl);
-
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto task = std::make_unique<PipelineXTask>(_pipelines[pip_idx],
_total_tasks++,
@@ -360,6 +361,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
}
_build_side_pipelines.clear();
+ _union_child_pipelines.clear();
_dag.clear();
// register the profile of child data stream sender
// for (auto& sender : _multi_cast_stream_sink_senders) {
@@ -504,6 +506,9 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
if (_build_side_pipelines.find(parent_idx) != _build_side_pipelines.end()
&& child_idx > 0) {
cur_pipe = _build_side_pipelines[parent_idx];
}
+ if (_union_child_pipelines.find(parent_idx) !=
_union_child_pipelines.end()) {
+ cur_pipe = _union_child_pipelines[parent_idx][child_idx];
+ }
std::stringstream error_msg;
switch (tnode.node_type) {
case TPlanNodeType::OLAP_SCAN_NODE: {
@@ -588,6 +593,32 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_build_side_pipelines.insert({sink->id(), build_side_pipe});
break;
}
+ case TPlanNodeType::UNION_NODE: {
+ int child_count = tnode.num_children;
+ op.reset(new UnionSourceOperatorX(pool, tnode, descs));
+ RETURN_IF_ERROR(cur_pipe->add_operator(op));
+
+ const auto downstream_pipeline_id = cur_pipe->id();
+ if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+ _dag.insert({downstream_pipeline_id, {}});
+ }
+ int father_id = tnode.node_id;
+ for (int i = 0; i < child_count; i++) {
+ PipelinePtr build_side_pipe = add_pipeline();
+ _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
+ DataSinkOperatorXPtr sink;
+ sink.reset(new UnionSinkOperatorX(i, father_id + 1000 * (i + 1),
pool, tnode, descs));
+ RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
+ RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode,
_runtime_state.get()));
+ if (_union_child_pipelines.find(father_id) ==
_union_child_pipelines.end()) {
+ _union_child_pipelines.insert({father_id, {build_side_pipe}});
+ } else {
+ _union_child_pipelines[father_id].push_back(build_side_pipe);
+ }
+ }
+
+ break;
+ }
case TPlanNodeType::SORT_NODE: {
op.reset(new SortSourceOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index a0970b166e..86796fe8e0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -156,6 +156,7 @@ private:
std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
std::mutex _state_map_lock;
+ std::map<int, std::vector<PipelinePtr>> _union_child_pipelines;
};
} // namespace pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index c9e1103a58..7e2458b12a 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -104,7 +104,11 @@ public:
DependencySPtr& get_downstream_dependency() { return
_downstream_dependency; }
void set_upstream_dependency(DependencySPtr& upstream_dependency) {
- _upstream_dependency.insert({upstream_dependency->id(),
upstream_dependency});
+ if (_upstream_dependency.contains(upstream_dependency->id())) {
+ upstream_dependency =
_upstream_dependency[upstream_dependency->id()];
+ } else {
+ _upstream_dependency.insert({upstream_dependency->id(),
upstream_dependency});
+ }
}
Dependency* get_upstream_dependency(int id) {
diff --git a/regression-test/data/pipelineX/test_union_operator.out
b/regression-test/data/pipelineX/test_union_operator.out
new file mode 100644
index 0000000000..439b88e790
--- /dev/null
+++ b/regression-test/data/pipelineX/test_union_operator.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !pipeline --
+20
+
+-- !pipeline --
+21
+
+-- !pipelineX --
+20
+
+-- !pipelineX --
+21
+
diff --git a/regression-test/suites/pipelineX/test_union_operator.groovy
b/regression-test/suites/pipelineX/test_union_operator.groovy
new file mode 100644
index 0000000000..59448c35ec
--- /dev/null
+++ b/regression-test/suites/pipelineX/test_union_operator.groovy
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_union_operator") {
+ sql """ DROP TABLE IF EXISTS UNIONNODE """
+ sql """
+ CREATE TABLE IF NOT EXISTS UNIONNODE (
+ `k1` INT(11) NULL COMMENT "",
+ `k2` INT(11) NULL COMMENT "",
+ `k3` INT(11) NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "storage_format" = "V2"
+ );
+ """
+ sql """ set forbid_unknown_col_stats = false """
+ sql """
+ INSERT INTO UNIONNODE (k1, k2, k3) VALUES
+ (5, 10, 15),
+ (8, 12, 6),
+ (3, 7, 11),
+ (9, 4, 14),
+ (2, 13, 1),
+ (6, 20, 16),
+ (11, 17, 19),
+ (7, 18, 8),
+ (12, 9, 2),
+ (4, 15, 10),
+ (16, 3, 13),
+ (10, 1, 7),
+ (14, 5, 12),
+ (19, 6, 4),
+ (1, 2, 18),
+ (13, 11, 3),
+ (18, 8, 5),
+ (15, 19, 9),
+ (17, 14, 17),
+ (20, 16, 45);
+ """
+
+ sql"""set enable_pipeline_engine = true,parallel_pipeline_task_num = 8; """
+
+
+
+ qt_pipeline """
+ SELECT count(*)
+ FROM (
+ SELECT k1 FROM UNIONNODE
+ UNION
+ SELECT k2 FROM UNIONNODE
+ ) AS merged_result;
+ """
+ qt_pipeline """
+ SELECT count(*)
+ FROM (
+ SELECT k1 FROM UNIONNODE
+ UNION
+ SELECT k2 FROM UNIONNODE
+ UNION
+ SELECT k3 FROM UNIONNODE
+ ) AS merged_result;
+
+
+ """
+
+ sql"""set
experimental_enable_pipeline_x_engine=true,parallel_pipeline_task_num = 8;;
"""
+
+ qt_pipelineX """
+ SELECT count(*)
+ FROM (
+ SELECT k1 FROM UNIONNODE
+ UNION
+ SELECT k2 FROM UNIONNODE
+ ) AS merged_result;
+ """
+ qt_pipelineX """
+ SELECT count(*)
+ FROM (
+ SELECT k1 FROM UNIONNODE
+ UNION
+ SELECT k2 FROM UNIONNODE
+ UNION
+ SELECT k3 FROM UNIONNODE
+ ) AS merged_result;
+ """
+
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]