This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e9cef9e71f9 [test](beut) add pipeline UnionOperator beut (#48984)
e9cef9e71f9 is described below
commit e9cef9e71f9a464f9807f93f95962aa43c7dd830
Author: Mryange <[email protected]>
AuthorDate: Mon Mar 17 14:09:15 2025 +0800
[test](beut) add pipeline UnionOperator beut (#48984)
### What problem does this PR solve?
add pipeline UnionOperator beut
---
be/src/pipeline/exec/union_sink_operator.cpp | 64 +++--
be/src/pipeline/exec/union_sink_operator.h | 61 +----
be/src/pipeline/exec/union_source_operator.cpp | 101 ++++----
be/src/pipeline/exec/union_source_operator.h | 59 ++---
be/src/vec/exprs/vliteral.h | 4 +
be/test/pipeline/operator/operator_helper.h | 18 ++
be/test/pipeline/operator/union_operator_test.cpp | 296 ++++++++++++++++++++++
be/test/testutil/column_helper.h | 11 +
be/test/testutil/mock/mock_descriptors.h | 1 +
be/test/testutil/mock/mock_literal_expr.cpp | 47 ++++
be/test/testutil/mock/mock_literal_expr.h | 83 ++++++
11 files changed, 579 insertions(+), 166 deletions(-)
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp
b/be/src/pipeline/exec/union_sink_operator.cpp
index 4bbb5eba3e3..8de21c6ac21 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -65,27 +65,16 @@ UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int
sink_id, int dest_id, O
Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
DCHECK(tnode.__isset.union_node);
- {
- // Create result_expr_ctx_lists_ from thrift exprs.
- auto& result_texpr_lists = tnode.union_node.result_expr_lists;
- auto& texprs = result_texpr_lists[_cur_child_id];
- vectorized::VExprContextSPtrs ctxs;
- RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
- _child_expr = ctxs;
- }
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
+ tnode.union_node.result_expr_lists[_cur_child_id], _child_expr));
return Status::OK();
}
Status UnionSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<UnionSinkLocalState>::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state,
_child->row_desc()));
- RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr,
_row_descriptor));
- // open const expr lists.
- RETURN_IF_ERROR(vectorized::VExpr::open(_const_expr, state));
-
- // open result expr lists.
RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
-
+ RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr,
row_descriptor()));
return Status::OK();
}
@@ -100,38 +89,45 @@ Status UnionSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* in_block
local_state._output_block =
local_state._shared_state->data_queue.get_free_block(_cur_child_id);
}
- if (_cur_child_id < _get_first_materialized_child_idx()) { //pass_through
+ if (is_child_passthrough(_cur_child_id)) {
+ //pass_through without expr
if (in_block->rows() > 0) {
local_state._output_block->swap(*in_block);
local_state._shared_state->data_queue.push_block(std::move(local_state._output_block),
_cur_child_id);
}
- } else if (_get_first_materialized_child_idx() != children_count() &&
- _cur_child_id < children_count()) { //need materialized
- RETURN_IF_ERROR(materialize_child_block(state, _cur_child_id, in_block,
-
local_state._output_block.get()));
} else {
- return Status::InternalError("maybe can't reach here, execute const
expr: {}, {}, {}",
- _cur_child_id,
_get_first_materialized_child_idx(),
- children_count());
- }
- if (UNLIKELY(eos)) {
- //if _cur_child_id eos, need check to push block
- //Now here can't check _output_block rows, even it's row==0, also need
push block
- //because maybe sink is eos and queue have none data, if not push block
- //the source can't can_read again and can't set source finished
- if (local_state._output_block) {
+ RETURN_IF_ERROR(materialize_child_block(state, in_block,
local_state._output_block.get()));
+ if (local_state._output_block->rows() > 0) {
local_state._shared_state->data_queue.push_block(std::move(local_state._output_block),
_cur_child_id);
}
-
+ }
+ if (UNLIKELY(eos)) {
+ // set_finish will set source ready
local_state._shared_state->data_queue.set_finish(_cur_child_id);
return Status::OK();
}
- // not eos and block rows is enough to output,so push block
- if (local_state._output_block && (local_state._output_block->rows() >=
state->batch_size())) {
-
local_state._shared_state->data_queue.push_block(std::move(local_state._output_block),
- _cur_child_id);
+ return Status::OK();
+}
+
+Status UnionSinkOperatorX::materialize_child_block(RuntimeState* state,
+ vectorized::Block*
input_block,
+ vectorized::Block*
output_block) {
+ auto& local_state = get_local_state(state);
+ SCOPED_TIMER(local_state._expr_timer);
+ if (input_block->rows() > 0) {
+ vectorized::MutableBlock mutable_block =
+
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
+
row_descriptor());
+ vectorized::ColumnsWithTypeAndName colunms;
+ const auto& child_exprs = local_state._child_expr;
+ for (const auto& child_expr : child_exprs) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(child_expr->execute(input_block,
&result_column_id));
+
colunms.emplace_back(input_block->get_by_position(result_column_id));
+ }
+ RETURN_IF_ERROR(mutable_block.merge(vectorized::Block {colunms}));
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/union_sink_operator.h
b/be/src/pipeline/exec/union_sink_operator.h
index 170b99f12f1..00a13bb9e8e 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -36,8 +36,7 @@ class UnionSinkOperatorX;
class UnionSinkLocalState final : public
PipelineXSinkLocalState<UnionSharedState> {
public:
ENABLE_FACTORY_CREATOR(UnionSinkLocalState);
- UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
- : Base(parent, state), _child_row_idx(0) {}
+ UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) :
Base(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
friend class UnionSinkOperatorX;
@@ -47,25 +46,24 @@ public:
private:
std::unique_ptr<vectorized::Block> _output_block;
- /// Const exprs materialized by this node. These exprs don't refer to any
children.
- /// Only materialized by the first fragment instance to avoid duplication.
- vectorized::VExprContextSPtrs _const_expr;
-
/// Exprs materialized by this node. The i-th result expr list refers to
the i-th child.
vectorized::VExprContextSPtrs _child_expr;
-
- /// Index of current row in child_row_block_.
- int _child_row_idx;
RuntimeProfile::Counter* _expr_timer = nullptr;
};
-class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState>
{
+class UnionSinkOperatorX MOCK_REMOVE(final) : public
DataSinkOperatorX<UnionSinkLocalState> {
public:
using Base = DataSinkOperatorX<UnionSinkLocalState>;
friend class UnionSinkLocalState;
UnionSinkOperatorX(int child_id, int sink_id, int dest_id, ObjectPool*
pool,
const TPlanNode& tnode, const DescriptorTbl& descs);
+#ifdef BE_TEST
+ UnionSinkOperatorX(int child_size, int cur_child_id, int
first_materialized_child_idx)
+ : _first_materialized_child_idx(first_materialized_child_idx),
+ _cur_child_id(cur_child_id),
+ _child_size(child_size) {}
+#endif
~UnionSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
@@ -102,13 +100,9 @@ public:
bool is_shuffled_operator() const override { return
_followed_by_shuffled_operator; }
-private:
- int _get_first_materialized_child_idx() const { return
_first_materialized_child_idx; }
-
- /// Const exprs materialized by this node. These exprs don't refer to any
children.
- /// Only materialized by the first fragment instance to avoid duplication.
- vectorized::VExprContextSPtrs _const_expr;
+ MOCK_FUNCTION const RowDescriptor& row_descriptor() { return
_row_descriptor; }
+private:
/// Exprs materialized by this node. The i-th result expr list refers to
the i-th child.
vectorized::VExprContextSPtrs _child_expr;
/// Index of the first non-passthrough child; i.e. a child that needs
materialization.
@@ -119,42 +113,13 @@ private:
const RowDescriptor _row_descriptor;
const int _cur_child_id;
const int _child_size;
- int children_count() const { return _child_size; }
+
bool is_child_passthrough(int child_idx) const {
DCHECK_LT(child_idx, _child_size);
return child_idx < _first_materialized_child_idx;
}
- Status materialize_child_block(RuntimeState* state, int child_id,
- vectorized::Block* input_block,
- vectorized::Block* output_block) {
- DCHECK_LT(child_id, _child_size);
- DCHECK(!is_child_passthrough(child_id));
- if (input_block->rows() > 0) {
- vectorized::MutableBlock mblock =
-
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
-
_row_descriptor);
- vectorized::Block res;
- RETURN_IF_ERROR(materialize_block(state, input_block, child_id,
&res));
- RETURN_IF_ERROR(mblock.merge(res));
- }
- return Status::OK();
- }
-
- Status materialize_block(RuntimeState* state, vectorized::Block*
src_block, int child_idx,
- vectorized::Block* res_block) {
- auto& local_state = get_local_state(state);
- SCOPED_TIMER(local_state._expr_timer);
- const auto& child_exprs = local_state._child_expr;
- vectorized::ColumnsWithTypeAndName colunms;
- for (size_t i = 0; i < child_exprs.size(); ++i) {
- int result_column_id = -1;
- RETURN_IF_ERROR(child_exprs[i]->execute(src_block,
&result_column_id));
- colunms.emplace_back(src_block->get_by_position(result_column_id));
- }
- local_state._child_row_idx += src_block->rows();
- *res_block = {colunms};
- return Status::OK();
- }
+ Status materialize_child_block(RuntimeState* state, vectorized::Block*
input_block,
+ vectorized::Block* output_block);
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index b381e1f2712..fb98f4c0ece 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -17,6 +17,7 @@
#include "pipeline/exec/union_source_operator.h"
+#include <algorithm>
#include <functional>
#include <utility>
@@ -55,31 +56,29 @@ Status UnionSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
return Status::OK();
}
-Status UnionSourceLocalState::open(RuntimeState* state) {
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
- RETURN_IF_ERROR(Base::open(state));
-
- auto& p = _parent->cast<Parent>();
- // Const exprs materialized by this node. These exprs don't refer to any
children.
- // Only materialized by the first fragment instance to avoid duplication.
- if (state->per_fragment_instance_idx() == 0) {
- auto clone_expr_list = [&](vectorized::VExprContextSPtrs&
cur_expr_list,
- vectorized::VExprContextSPtrs&
other_expr_list) {
- cur_expr_list.resize(other_expr_list.size());
- for (int i = 0; i < cur_expr_list.size(); i++) {
- RETURN_IF_ERROR(other_expr_list[i]->clone(state,
cur_expr_list[i]));
- }
- return Status::OK();
- };
- _const_expr_lists.resize(p._const_expr_lists.size());
- for (int i = 0; i < _const_expr_lists.size(); i++) {
- auto& _const_expr_list = _const_expr_lists[i];
- auto& other_expr_list = p._const_expr_lists[i];
- RETURN_IF_ERROR(clone_expr_list(_const_expr_list,
other_expr_list));
- }
+Status UnionSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
{
+ RETURN_IF_ERROR(Base::init(tnode, state));
+ for (const auto& texprs : tnode.union_node.const_expr_lists) {
+ vectorized::VExprContextSPtrs ctxs;
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+ _const_expr_lists.push_back(ctxs);
+ }
+ if (!std::ranges::all_of(_const_expr_lists, [&](const auto& exprs) {
+ return exprs.size() == _const_expr_lists.front().size();
+ })) {
+ return Status::InternalError("Const expr lists size not match");
}
+ return Status::OK();
+}
+Status UnionSourceOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::prepare(state));
+ for (const vectorized::VExprContextSPtrs& exprs : _const_expr_lists) {
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(exprs, state,
row_descriptor()));
+ }
+ for (const auto& exprs : _const_expr_lists) {
+ RETURN_IF_ERROR(vectorized::VExpr::open(exprs, state));
+ }
return Status::OK();
}
@@ -100,15 +99,19 @@ Status UnionSourceOperatorX::get_block(RuntimeState*
state, vectorized::Block* b
// the eos check of union operator is complex, need check all logical
if you want modify
// could ref this PR: https://github.com/apache/doris/pull/29677
// have executing const expr, queue have no data anymore, and child
could be closed
- if (_child_size == 0 && !local_state._need_read_for_const_expr) {
- *eos = true;
- } else if (_has_data(state)) {
+ if (_child_size == 0) {
+ // If _child_size == 0, eos = true will only be returned when all
constant expressions are executed
+ *eos = !local_state._need_read_for_const_expr;
+ } else if (has_data(state)) {
+ // data queue still has data, return eos = false
*eos = false;
} else if (local_state._shared_state->data_queue.is_all_finish()) {
// Here, check the value of `_has_data(state)` again after
`data_queue.is_all_finish()` is TRUE
// as there may be one or more blocks when
`data_queue.is_all_finish()` is TRUE.
- *eos = !_has_data(state);
+ *eos = !has_data(state);
} else {
+ // At this point, the data queue has no data, but the sink is not
all finished, return eos = false
+ // (this situation may be because the source consumes too fast)
*eos = false;
}
}};
@@ -128,7 +131,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* b
return Status::OK();
}
block->swap(*output_block);
-
output_block->clear_column_data(_row_descriptor.num_materialized_slots());
+
output_block->clear_column_data(row_descriptor().num_materialized_slots());
local_state._shared_state->data_queue.push_free_block(std::move(output_block),
child_idx);
}
local_state.reached_limit(block, eos);
@@ -137,46 +140,34 @@ Status UnionSourceOperatorX::get_block(RuntimeState*
state, vectorized::Block* b
Status UnionSourceOperatorX::get_next_const(RuntimeState* state,
vectorized::Block* block) {
DCHECK_EQ(state->per_fragment_instance_idx(), 0);
- auto& local_state =
state->get_local_state(operator_id())->cast<UnionSourceLocalState>();
+ auto& local_state = get_local_state(state);
DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size());
SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
- auto& _const_expr_list_idx = local_state._const_expr_list_idx;
- vectorized::MutableBlock mblock =
- vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block,
_row_descriptor);
- for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() <
state->batch_size();
- ++_const_expr_list_idx) {
+ auto& const_expr_list_idx = local_state._const_expr_list_idx;
+ vectorized::MutableBlock mutable_block =
+ vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block,
row_descriptor());
+ for (; const_expr_list_idx < _const_expr_lists.size() &&
+ mutable_block.rows() < state->batch_size();
+ ++const_expr_list_idx) {
vectorized::Block tmp_block;
+ // When we execute a constant expression, we need one row of data
because the expr may use the block's rows for some judgments
tmp_block.insert({vectorized::ColumnUInt8::create(1),
std::make_shared<vectorized::DataTypeUInt8>(), ""});
- int const_expr_lists_size =
cast_set<int>(_const_expr_lists[_const_expr_list_idx].size());
- if (_const_expr_list_idx && const_expr_lists_size !=
_const_expr_lists[0].size()) {
- return Status::InternalError(
- "[UnionNode]const expr at {}'s count({}) not matched({}
expected)",
- _const_expr_list_idx, const_expr_lists_size,
_const_expr_lists[0].size());
- }
-
- std::vector<int> result_list(const_expr_lists_size);
- for (size_t i = 0; i < const_expr_lists_size; ++i) {
-
RETURN_IF_ERROR(_const_expr_lists[_const_expr_list_idx][i]->execute(&tmp_block,
-
&result_list[i]));
- }
- tmp_block.erase_not_in(result_list);
- if (tmp_block.columns() != mblock.columns()) {
- return Status::InternalError(
- "[UnionNode]columns count of const expr block not matched
({} vs {})",
- tmp_block.columns(), mblock.columns());
- }
- if (tmp_block.rows() > 0) {
- RETURN_IF_ERROR(mblock.merge(tmp_block));
- tmp_block.clear();
+ vectorized::ColumnsWithTypeAndName colunms;
+ for (auto& expr : _const_expr_lists[const_expr_list_idx]) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(expr->execute(&tmp_block, &result_column_id));
+ colunms.emplace_back(tmp_block.get_by_position(result_column_id));
}
+ RETURN_IF_ERROR(mutable_block.merge(vectorized::Block {colunms}));
}
// some insert query like "insert into string_test select 1, repeat('a',
1024 * 1024);"
// the const expr will be in output expr cause the union node return a
empty block. so here we
// need add one row to make sure the union node exec const expr return at
least one row
+ /// TODO: maybe we can remove this
if (block->rows() == 0) {
block->insert({vectorized::ColumnUInt8::create(1),
std::make_shared<vectorized::DataTypeUInt8>(), ""});
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 6619b623ef5..3985cf3910e 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -43,7 +43,6 @@ public:
UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) :
Base(state, parent) {};
Status init(RuntimeState* state, LocalStateInfo& info) override;
- Status open(RuntimeState* state) override;
[[nodiscard]] std::string debug_string(int indentation_level = 0) const
override;
@@ -52,49 +51,50 @@ private:
friend class OperatorX<UnionSourceLocalState>;
bool _need_read_for_const_expr {true};
int _const_expr_list_idx {0};
- std::vector<vectorized::VExprContextSPtrs> _const_expr_lists;
// If this operator has no children, there is no shared state which owns
dependency. So we
// use this local state to hold this dependency.
DependencySPtr _only_const_dependency = nullptr;
};
-class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
+/*
+There are two cases for union node: one is only constant expressions, and the
other is having other child nodes besides constant expressions.
+Unlike other union operators, the union node only merges data without
deduplication.
+
+| 0:VUNION(66)
|
+| constant exprs:
|
+| 1 | 2 | 3 | 4
|
+| 5 | 6 | 7 | 8
|
+| tuple ids: 0
|
+
+| 4:VUNION(179)
|
+| | constant exprs:
|
+| | 1 | 2 | 3 | 4
|
+| | 5 | 6 | 7 | 8
|
+| | child exprs:
|
+| | k1[#0] | k2[#1] | k3[#2] | k4[#3]
|
+| | k1[#4] | k2[#5] | k3[#6] | k4[#7]
|
+| | tuple ids: 2
|
+*/
+
+class UnionSourceOperatorX MOCK_REMOVE(final) : public
OperatorX<UnionSourceLocalState> {
public:
using Base = OperatorX<UnionSourceLocalState>;
UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs)
: Base(pool, tnode, operator_id, descs),
_child_size(tnode.num_children) {}
+
+#ifdef BE_TEST
+ UnionSourceOperatorX(int child_size) : _child_size(child_size) {}
+#endif
~UnionSourceOperatorX() override = default;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
bool is_source() const override { return true; }
- Status init(const TPlanNode& tnode, RuntimeState* state) override {
- RETURN_IF_ERROR(Base::init(tnode, state));
- DCHECK(tnode.__isset.union_node);
- // Create const_expr_ctx_lists_ from thrift exprs.
- auto& const_texpr_lists = tnode.union_node.const_expr_lists;
- for (auto& texprs : const_texpr_lists) {
- vectorized::VExprContextSPtrs ctxs;
- RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs,
ctxs));
- _const_expr_lists.push_back(ctxs);
- }
- return Status::OK();
- }
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
- Status prepare(RuntimeState* state) override {
- static_cast<void>(Base::prepare(state));
- // Prepare const expr lists.
- for (const vectorized::VExprContextSPtrs& exprs : _const_expr_lists) {
- RETURN_IF_ERROR(vectorized::VExpr::prepare(exprs, state,
_row_descriptor));
- }
- // open const expr lists.
- for (const auto& exprs : _const_expr_lists) {
- RETURN_IF_ERROR(vectorized::VExpr::open(exprs, state));
- }
- return Status::OK();
- }
[[nodiscard]] int get_child_count() const { return _child_size; }
bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
@@ -114,7 +114,7 @@ public:
}
private:
- bool _has_data(RuntimeState* state) const {
+ bool has_data(RuntimeState* state) const {
auto& local_state = get_local_state(state);
if (_child_size == 0) {
return local_state._need_read_for_const_expr;
@@ -122,9 +122,10 @@ private:
return local_state._shared_state->data_queue.remaining_has_data();
}
bool has_more_const(RuntimeState* state) const {
+ // For constant expressions, only one instance will execute the
expression
auto& local_state = get_local_state(state);
return state->per_fragment_instance_idx() == 0 &&
- local_state._const_expr_list_idx <
local_state._const_expr_lists.size();
+ local_state._const_expr_list_idx < _const_expr_lists.size();
}
friend class UnionSourceLocalState;
const int _child_size;
diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h
index 3d9c948ec88..8efb5677cb0 100644
--- a/be/src/vec/exprs/vliteral.h
+++ b/be/src/vec/exprs/vliteral.h
@@ -43,6 +43,10 @@ public:
}
}
+#ifdef BE_TEST
+ VLiteral() = default;
+#endif
+
Status prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) override;
Status execute(VExprContext* context, Block* block, int* result_column_id)
override;
diff --git a/be/test/pipeline/operator/operator_helper.h
b/be/test/pipeline/operator/operator_helper.h
index a2d2c690c4e..db658ca49d7 100644
--- a/be/test/pipeline/operator/operator_helper.h
+++ b/be/test/pipeline/operator/operator_helper.h
@@ -43,6 +43,24 @@ struct OperatorHelper {
LocalStateInfo info {&ctx.profile, scan_ranges, 0, {}, 0};
EXPECT_TRUE(op.setup_local_state(&ctx.state, info).ok());
}
+
+ static bool is_block(std::vector<Dependency*> deps) {
+ for (auto* dep : deps) {
+ if (!dep->ready()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ static bool is_ready(std::vector<Dependency*> deps) {
+ for (auto* dep : deps) {
+ if (!dep->ready()) {
+ return false;
+ }
+ }
+ return true;
+ }
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/union_operator_test.cpp
b/be/test/pipeline/operator/union_operator_test.cpp
new file mode 100644
index 00000000000..b0c973f5499
--- /dev/null
+++ b/be/test/pipeline/operator/union_operator_test.cpp
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "operator_helper.h"
+#include "pipeline/exec/mock_operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/exec/union_source_operator.h"
+#include "pipeline/operator/operator_helper.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_descriptors.h"
+#include "testutil/mock/mock_literal_expr.h"
+#include "testutil/mock/mock_slot_ref.h"
+#include "vec/core/block.h"
+namespace doris::pipeline {
+
+using namespace vectorized;
+
+struct MockUnionSourceOperator : public UnionSourceOperatorX {
+ MockUnionSourceOperator(int32_t child_size, DataTypes types, ObjectPool*
pool)
+ : UnionSourceOperatorX(child_size), _mock_row_descriptor(types,
pool) {}
+ RowDescriptor& row_descriptor() override { return _mock_row_descriptor; }
+ MockRowDescriptor _mock_row_descriptor;
+};
+
+struct MockUnionSinkOperator : public UnionSinkOperatorX {
+ MockUnionSinkOperator(int child_size, int cur_child_id, int
first_materialized_child_idx,
+ DataTypes types, ObjectPool* pool)
+ : UnionSinkOperatorX(child_size, cur_child_id,
first_materialized_child_idx),
+ _mock_row_descriptor(types, pool) {}
+
+ RowDescriptor& row_descriptor() override { return _mock_row_descriptor; }
+ MockRowDescriptor _mock_row_descriptor;
+};
+
+struct UnionOperatorTest : public ::testing::Test {
+ void SetUp() override {
+ state = std::make_shared<MockRuntimeState>();
+ state->batsh_size = 10;
+ for (int i = 0; i < child_size; i++) {
+ sink_state.push_back(std::make_shared<MockRuntimeState>());
+ sink_ops.push_back(nullptr);
+ }
+ }
+
+ std::shared_ptr<MockUnionSourceOperator> source_op;
+ UnionSourceLocalState* source_local_state;
+
+ std::shared_ptr<MockRuntimeState> state;
+
+ RuntimeProfile profile {""};
+
+ ObjectPool pool;
+
+ const int child_size = 3;
+ const int first_materialized_child_idx = 1;
+
+ std::vector<std::shared_ptr<MockUnionSinkOperator>> sink_ops;
+ std::vector<std::shared_ptr<MockRuntimeState>> sink_state;
+};
+
+TEST_F(UnionOperatorTest, test_all_const_expr) {
+ state->batsh_size = 2;
+ source_op.reset(new MockUnionSourceOperator {
+ 0,
+ {std::make_shared<DataTypeInt64>(),
std::make_shared<DataTypeInt64>(),
+ std::make_shared<DataTypeInt64>(),
std::make_shared<DataTypeInt64>()},
+ &pool});
+ EXPECT_TRUE(source_op->prepare(state.get()));
+
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({1,
10, 100, 1000}));
+
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({2,
20, 200, 2000}));
+
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({3,
30, 300, 3000}));
+
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({4,
40, 400, 4000}));
+
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({5,
50, 500, 5000}));
+ auto source_local_state_uptr =
+ std::make_unique<UnionSourceLocalState>(state.get(),
source_op.get());
+ source_local_state = source_local_state_uptr.get();
+ LocalStateInfo info {.parent_profile = &profile,
+ .scan_ranges = {},
+ .shared_state = nullptr,
+ .le_state_map = {},
+ .task_idx = 0};
+ EXPECT_TRUE(source_local_state->init(state.get(), info));
+ state->resize_op_id_to_local_state(-100);
+ state->emplace_local_state(source_op->operator_id(),
std::move(source_local_state_uptr));
+ EXPECT_TRUE(source_local_state->open(state.get()));
+ EXPECT_EQ(source_local_state->dependencies().size(), 1);
+ EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies()));
+
+ EXPECT_TRUE(source_local_state->_need_read_for_const_expr);
+ EXPECT_EQ(source_local_state->_const_expr_list_idx, 0);
+
+ EXPECT_TRUE(source_op->has_more_const(state.get()));
+
+ {
+ Block block;
+ bool eos;
+ EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+ EXPECT_EQ(block.rows(), 2);
+ EXPECT_FALSE(eos);
+ std::cout << block.dump_data() << std::endl;
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, Block {
+
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({10, 20}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({100, 200}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({1000, 2000}),
+ }));
+ }
+
+ {
+ Block block;
+ bool eos;
+ EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+ EXPECT_EQ(block.rows(), 2);
+ EXPECT_FALSE(eos);
+ std::cout << block.dump_data() << std::endl;
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, Block {
+
ColumnHelper::create_column_with_name<DataTypeInt64>({3, 4}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({30, 40}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({300, 400}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({3000, 4000}),
+ }));
+ }
+
+ {
+ Block block;
+ bool eos;
+ EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+ EXPECT_EQ(block.rows(), 1);
+ EXPECT_TRUE(eos);
+ std::cout << block.dump_data() << std::endl;
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, Block {
+
ColumnHelper::create_column_with_name<DataTypeInt64>({5}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({50}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({500}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({5000}),
+ }));
+ }
+}
+
+TEST_F(UnionOperatorTest, test_sink_and_source) {
+ DataTypes types = {std::make_shared<DataTypeInt64>(),
std::make_shared<DataTypeInt64>()};
+ source_op.reset(new MockUnionSourceOperator {child_size, types, &pool});
+ EXPECT_TRUE(source_op->prepare(state.get()));
+
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({1,
10}));
+
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({2,
20}));
+
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({3,
30}));
+
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({4,
40}));
+
source_op->_const_expr_lists.push_back(MockLiteral::create<DataTypeInt64>({5,
50}));
+
+ for (int i = 0; i < child_size; i++) {
+ sink_ops[i].reset(new MockUnionSinkOperator {child_size, i,
first_materialized_child_idx,
+ types, &pool});
+ sink_ops[i]->_child_expr = MockSlotRef::create_mock_contexts(types);
+ }
+
+ auto shared_state = sink_ops[0]->create_shared_state();
+
+ //auto & data_queue =
dynamic_cast<UnionSharedState*>(shared_state.get())->data_queue;
+ EXPECT_TRUE(shared_state != nullptr);
+ EXPECT_TRUE(sink_ops[1]->create_shared_state() == nullptr);
+ EXPECT_TRUE(sink_ops[2]->create_shared_state() == nullptr);
+
+ {
+ auto source_local_state_uptr =
+ std::make_unique<UnionSourceLocalState>(state.get(),
source_op.get());
+ source_local_state = source_local_state_uptr.get();
+
+ LocalStateInfo info {.parent_profile = &profile,
+ .scan_ranges = {},
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .task_idx = 0};
+ EXPECT_TRUE(source_local_state->init(state.get(), info));
+ state->resize_op_id_to_local_state(-100);
+ state->emplace_local_state(source_op->operator_id(),
std::move(source_local_state_uptr));
+ EXPECT_TRUE(source_local_state->open(state.get()));
+ EXPECT_EQ(source_local_state->dependencies().size(), 1);
+
EXPECT_TRUE(OperatorHelper::is_block(source_local_state->dependencies()));
+ }
+
+ {
+ for (int i = 0; i < child_size; i++) {
+ auto sink_local_state_uptr =
+ std::make_unique<UnionSinkLocalState>(sink_ops[i].get(),
sink_state[i].get());
+ auto* sink_local_state = sink_local_state_uptr.get();
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile = &profile,
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink {}};
+ EXPECT_TRUE(sink_local_state->init(sink_state[i].get(), info));
+ sink_state[i]->resize_op_id_to_local_state(-100);
+ sink_state[i]->emplace_sink_local_state(sink_ops[i]->operator_id(),
+
std::move(sink_local_state_uptr));
+ EXPECT_TRUE(sink_local_state->open(sink_state[i].get()));
+ EXPECT_EQ(sink_local_state->dependencies().size(), 1);
+
EXPECT_TRUE(OperatorHelper::is_ready(sink_local_state->dependencies()));
+ }
+ }
+
+ {
+ Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3,
4});
+ EXPECT_TRUE(sink_ops[0]->sink(sink_state[0].get(), &block, false));
+ }
+ {
+ Block block;
+ bool eos;
+ EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+ EXPECT_EQ(block.rows(), 5);
+ EXPECT_FALSE(eos);
+ std::cout << block.dump_data() << std::endl;
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block,
+ Block {
+
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({10, 20, 30, 40, 50}),
+ }));
+ }
+
+ EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies()));
+
+ {
+ Block block;
+ bool eos;
+ EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+ EXPECT_EQ(block.rows(), 2);
+ EXPECT_FALSE(eos);
+ std::cout << block.dump_data() << std::endl;
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3,
4})));
+ }
+
+ EXPECT_TRUE(OperatorHelper::is_block(source_local_state->dependencies()));
+
+ {
+ for (int i = 0; i < child_size; i++) {
+ Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2},
{3, 4});
+ EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, false));
+ }
+ for (int i = 0; i < child_size; i++) {
+ Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2},
{3, 4});
+ EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, false));
+ }
+ for (int i = 0; i < child_size; i++) {
+ Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2},
{3, 4});
+ EXPECT_TRUE(sink_ops[i]->sink(sink_state[i].get(), &block, true));
+ }
+ }
+
+ EXPECT_TRUE(OperatorHelper::is_ready(source_local_state->dependencies()));
+ for (int i = 0; i < 8; i++) {
+ Block block;
+ bool eos;
+ EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+ EXPECT_EQ(block.rows(), 2);
+ EXPECT_FALSE(eos);
+ std::cout << block.dump_data() << std::endl;
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3,
4})));
+ }
+
+ {
+ Block block;
+ bool eos;
+ EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos).ok());
+ EXPECT_EQ(block.rows(), 2);
+ EXPECT_TRUE(eos);
+ std::cout << block.dump_data() << std::endl;
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt64>({1, 2}, {3,
4})));
+ }
+}
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/testutil/column_helper.h b/be/test/testutil/column_helper.h
index a9cf58bb880..9d35c7af62d 100644
--- a/be/test/testutil/column_helper.h
+++ b/be/test/testutil/column_helper.h
@@ -87,6 +87,17 @@ public:
return block;
}
+ template <typename DataType>
+ static Block create_block(const std::vector<typename DataType::FieldType>&
data1,
+ const std::vector<typename DataType::FieldType>&
data2) {
+ auto column1 = create_column<DataType>(data1);
+ auto column2 = create_column<DataType>(data2);
+ auto data_type = std::make_shared<DataType>();
+ Block block({ColumnWithTypeAndName(column1, data_type, "column1"),
+ ColumnWithTypeAndName(column2, data_type, "column2")});
+ return block;
+ }
+
template <typename DataType>
static Block create_nullable_block(const std::vector<typename
DataType::FieldType>& data,
const std::vector<typename
NullMap::value_type>& null_map) {
diff --git a/be/test/testutil/mock/mock_descriptors.h
b/be/test/testutil/mock/mock_descriptors.h
index 198c821dbe9..5419a22d38b 100644
--- a/be/test/testutil/mock/mock_descriptors.h
+++ b/be/test/testutil/mock/mock_descriptors.h
@@ -54,6 +54,7 @@ public:
tuple_desc->Slots = slots;
tuple_desc_map.push_back(tuple_desc);
_tuple_desc_map.push_back(tuple_desc);
+ _num_materialized_slots = types.size();
}
const std::vector<TupleDescriptor*>& tuple_descriptors() const override {
return tuple_desc_map;
diff --git a/be/test/testutil/mock/mock_literal_expr.cpp
b/be/test/testutil/mock/mock_literal_expr.cpp
new file mode 100644
index 00000000000..9f881a2ba13
--- /dev/null
+++ b/be/test/testutil/mock/mock_literal_expr.cpp
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "mock_literal_expr.h"
+
+#include <gtest/gtest.h>
+
+#include "testutil/column_helper.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris::vectorized {
+
+TEST(MockLiteralTest, test) {
+ {
+ auto ctxs = MockLiteral::create<DataTypeInt64>({1, 2, 3, 4});
+ Block block;
+ for (auto& ctx : ctxs) {
+ int result_column_id = -1;
+ EXPECT_TRUE(ctx->execute(&block, &result_column_id));
+ }
+
+ std::cout << block.dump_data() << std::endl;
+
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, Block {
+
ColumnHelper::create_column_with_name<DataTypeInt64>({1}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({2}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({3}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({4}),
+ }));
+ }
+}
+} // namespace doris::vectorized
diff --git a/be/test/testutil/mock/mock_literal_expr.h
b/be/test/testutil/mock/mock_literal_expr.h
new file mode 100644
index 00000000000..21bab1742d9
--- /dev/null
+++ b/be/test/testutil/mock/mock_literal_expr.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <memory>
+#include <string>
+
+#include "common/status.h"
+#include "testutil/column_helper.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/exprs/vliteral.h"
+#include "vec/exprs/vslot_ref.h"
+
+namespace doris {
+class SlotDescriptor;
+class RowDescriptor;
+class RuntimeState;
+class TExprNode;
+
+namespace vectorized {
+class Block;
+class VExprContext;
+
+class MockLiteral final : public VLiteral {
+public:
+ MockLiteral(ColumnWithTypeAndName data) {
+ _data_type = data.type;
+ _column_ptr = data.column;
+ _expr_name = data.name;
+ }
+
+ Status prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) override {
+ _prepare_finished = true;
+ return Status::OK();
+ }
+
+ Status open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override {
+ _open_finished = true;
+ return Status::OK();
+ }
+ const std::string& expr_name() const override { return _name; }
+
+ template <typename DataType>
+ static VExprContextSPtr create(const DataType::FieldType& value) {
+ auto ctx = VExprContext::create_shared(std::make_shared<MockLiteral>(
+ ColumnHelper::create_column_with_name<DataType>({value})));
+ ctx->_prepared = true;
+ ctx->_opened = true;
+ return ctx;
+ }
+
+ template <typename DataType>
+ static VExprContextSPtrs create(const std::vector<typename
DataType::FieldType>& values) {
+ VExprContextSPtrs ctxs;
+ for (const auto& value : values) {
+ ctxs.push_back(create<DataType>(value));
+ }
+ return ctxs;
+ }
+
+private:
+ const std::string _name = "MockLiteral";
+};
+
+} // namespace vectorized
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]