This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0813d701224 [test](beut) add pipeline DistinctStreamingAggOperator
beut (#48805)
0813d701224 is described below
commit 0813d70122405d71d3ca320139eaaba571b468df
Author: Mryange <[email protected]>
AuthorDate: Mon Mar 10 14:22:03 2025 +0800
[test](beut) add pipeline DistinctStreamingAggOperator beut (#48805)
### What problem does this PR solve?
add pipeline DistinctStreamingAggOperator beut
---
.../distinct_streaming_aggregation_operator.cpp | 88 +++------
.../exec/distinct_streaming_aggregation_operator.h | 20 ++-
be/src/pipeline/exec/operator.h | 3 +
...istinct_streaming_aggregation_operator_test.cpp | 198 +++++++++++++++++++++
be/test/testutil/mock/mock_descriptors.h | 1 +
be/test/testutil/mock/mock_slot_ref.cpp | 11 ++
be/test/testutil/mock/mock_slot_ref.h | 2 +
7 files changed, 250 insertions(+), 73 deletions(-)
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 24df662bb57..dc266dc9ae1 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -44,13 +44,13 @@ struct StreamingHtMinReductionEntry {
// of the machine that we're running on.
static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
// Expand up to L2 cache always.
- {0, 0.0},
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
// Expand into L3 cache if we look like we're getting some reduction.
// At present, The L2 cache is generally 1024k or more
- {1024 * 1024, 0.0},
+ {.min_ht_mem = 1024 * 1024, .streaming_ht_min_reduction = 0.0},
// Expand into main memory if we're getting a significant reduction.
// The L3 cache is generally 16MB or more
- {16 * 1024 * 1024, 2.0},
+ {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
};
static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
@@ -85,15 +85,10 @@ Status DistinctStreamingAggLocalState::open(RuntimeState*
state) {
SCOPED_TIMER(Base::_open_timer);
RETURN_IF_ERROR(Base::open(state));
auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
- for (auto& evaluator : p._aggregate_evaluators) {
- _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
- }
_probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state,
_probe_expr_ctxs[i]));
}
-
- DCHECK_EQ(p._total_size_of_aggregate_states, 0);
RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs));
return Status::OK();
}
@@ -192,15 +187,21 @@ Status
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
}
}
- size_t rows = in_block->rows();
+ const size_t rows = in_block->rows();
_distinct_row.clear();
- _distinct_row.reserve(rows);
+
if (_parent->cast<DistinctStreamingAggOperatorX>()._is_streaming_preagg &&
low_memory_mode()) {
_stop_emplace_flag = true;
}
if (!_stop_emplace_flag) {
+ // _distinct_row is used to calculate non-duplicate data in key_columns
+ // _emplace_into_hash_table_to_distinct will determine whether to
continue inserting data into the hashmap
+ // If it decides not to insert data, it will set _stop_emplace_flag =
true and _distinct_row will be empty
+ _distinct_row.reserve(rows);
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows);
+ DCHECK_LE(_distinct_row.size(), rows)
+ << "_distinct_row size should be less than or equal to rows";
}
bool mem_reuse =
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
@@ -216,6 +217,7 @@ Status
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
}
DCHECK_EQ(out_block->columns(), key_size);
if (_stop_emplace_flag && _distinct_row.empty()) {
+ // If _stop_emplace_flag is true and _distinct_row is also empty,
it means it is in streaming mode, outputting what is input
// swap the column directly, to solve Check failed:
d.column->use_count() == 1 (2 vs. 1)
for (int i = 0; i < key_size; ++i) {
auto output_column = out_block->get_by_position(i).column;
@@ -224,6 +226,7 @@ Status
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
}
} else {
DCHECK_EQ(_cache_block.rows(), 0);
+ // is output row > batch_size, split some to cache_block
if (out_block->rows() + _distinct_row.size() > batch_size) {
size_t split_size = batch_size - out_block->rows();
for (int i = 0; i < key_size; ++i) {
@@ -243,6 +246,7 @@ Status
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
}
}
} else {
+ DCHECK(out_block->empty()) << "out_block must be empty , but rows is "
<< out_block->rows();
vectorized::ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
if (_stop_emplace_flag) {
@@ -320,7 +324,6 @@
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
const
DescriptorTbl& descs,
bool
require_bucket_distribution)
: StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode,
operator_id, descs),
- _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_output_tuple_id(tnode.agg_node.output_tuple_id),
_needs_finalize(tnode.agg_node.need_finalize),
_is_first_phase(tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase),
@@ -328,8 +331,7 @@
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
? tnode.distribute_expr_lists[0]
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate),
- _require_bucket_distribution(require_bucket_distribution),
- _without_key(tnode.agg_node.grouping_exprs.empty()) {
+ _require_bucket_distribution(require_bucket_distribution) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
@@ -346,32 +348,22 @@ Status DistinctStreamingAggOperatorX::init(const
TPlanNode& tnode, RuntimeState*
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_trees(tnode.agg_node.grouping_exprs,
_probe_expr_ctxs));
- // init aggregate functions
- _aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size());
-
- TSortInfo dummy;
- for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
- vectorized::AggFnEvaluator* evaluator = nullptr;
- RETURN_IF_ERROR(vectorized::AggFnEvaluator::create(
- _pool, tnode.agg_node.aggregate_functions[i],
- tnode.agg_node.__isset.agg_sort_infos ?
tnode.agg_node.agg_sort_infos[i] : dummy,
- tnode.agg_node.grouping_exprs.empty(), &evaluator));
- _aggregate_evaluators.push_back(evaluator);
- }
-
_op_name = "DISTINCT_STREAMING_AGGREGATION_OPERATOR";
return Status::OK();
}
Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state));
- _intermediate_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
- _output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
- DCHECK_EQ(_intermediate_tuple_desc->slots().size(),
_output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state,
_child->row_desc()));
+ RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));
+ init_make_nullable(state);
+ return Status::OK();
+}
- size_t j = _probe_expr_ctxs.size();
- for (size_t i = 0; i < j; ++i) {
+void DistinctStreamingAggOperatorX::init_make_nullable(RuntimeState* state) {
+ _output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+
+ for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable();
auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable();
if (nullable_output != nullable_input) {
@@ -379,40 +371,6 @@ Status
DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
_make_nullable_keys.emplace_back(i);
}
}
- for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) {
- SlotDescriptor* intermediate_slot_desc =
_intermediate_tuple_desc->slots()[j];
- SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
- RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
- state, _child->row_desc(), intermediate_slot_desc,
output_slot_desc));
- _aggregate_evaluators[i]->set_version(state->be_exec_version());
- }
-
- for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
- const auto& agg_function = _aggregate_evaluators[i]->function();
- _total_size_of_aggregate_states += agg_function->size_of_data();
-
- // If not the last aggregate_state, we need pad it so that next
aggregate_state will be aligned.
- if (i + 1 < _aggregate_evaluators.size()) {
- size_t alignment_of_next_state =
- _aggregate_evaluators[i + 1]->function()->align_of_data();
- if ((alignment_of_next_state & (alignment_of_next_state - 1)) !=
0) {
- return Status::RuntimeError("Logical error: align_of_data is
not 2^N");
- }
-
- /// Extend total_size to next alignment requirement
- /// Add padding by rounding up 'total_size_of_aggregate_states' to
be a multiplier of alignment_of_next_state.
- _total_size_of_aggregate_states =
- (_total_size_of_aggregate_states + alignment_of_next_state
- 1) /
- alignment_of_next_state * alignment_of_next_state;
- }
- }
- RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));
-
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state));
- }
-
- return Status::OK();
}
Status DistinctStreamingAggOperatorX::push(RuntimeState* state,
vectorized::Block* in_block,
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 1066ea37236..bf2be9d850b 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -74,7 +74,6 @@ private:
const int batch_size;
std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
std::unique_ptr<DistinctDataVariants> _agg_data = nullptr;
- std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
@@ -97,6 +96,15 @@ class DistinctStreamingAggOperatorX final
public:
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
const DescriptorTbl& descs, bool
require_bucket_distribution);
+#ifdef BE_TEST
+ DistinctStreamingAggOperatorX()
+ : _needs_finalize(false),
+ _is_first_phase(true),
+ _partition_exprs({}),
+ _is_colocate(false),
+ _require_bucket_distribution {false} {}
+#endif
+
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos)
const override;
@@ -119,9 +127,7 @@ public:
private:
friend class DistinctStreamingAggLocalState;
- TupleId _intermediate_tuple_id;
- TupleDescriptor* _intermediate_tuple_desc = nullptr;
-
+ void init_make_nullable(RuntimeState* state);
TupleId _output_tuple_id;
TupleDescriptor* _output_tuple_desc = nullptr;
const bool _needs_finalize;
@@ -131,12 +137,10 @@ private:
const bool _require_bucket_distribution;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
- std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
std::vector<size_t> _make_nullable_keys;
- /// The total size of the row from the aggregate functions.
- size_t _total_size_of_aggregate_states = 0;
+
+ // If _is_streaming_preagg = true, deduplication will be abandoned in
cases where the deduplication rate is low.
bool _is_streaming_preagg = false;
- const bool _without_key;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 8900d6fe844..8542f879980 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -1024,6 +1024,9 @@ public:
StatefulOperatorX(ObjectPool* pool, const TPlanNode& tnode, const int
operator_id,
const DescriptorTbl& descs)
: OperatorX<LocalStateType>(pool, tnode, operator_id, descs) {}
+#ifdef BE_TEST
+ StatefulOperatorX() = default;
+#endif
virtual ~StatefulOperatorX() = default;
using OperatorX<LocalStateType>::get_local_state;
diff --git
a/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
new file mode 100644
index 00000000000..c1e92272739
--- /dev/null
+++ b/be/test/pipeline/operator/distinct_streaming_aggregation_operator_test.cpp
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/distinct_streaming_aggregation_operator.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "pipeline/exec/mock_operator.h"
+#include "pipeline/operator/operator_helper.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_descriptors.h"
+#include "testutil/mock/mock_slot_ref.h"
+#include "vec/core/block.h"
+namespace doris::pipeline {
+
+using namespace vectorized;
+struct DistinctStreamingAggOperatorTest : public ::testing::Test {
+ void SetUp() override {
+ op = std::make_unique<DistinctStreamingAggOperatorX>();
+ mock_op = std::make_shared<MockOperatorX>();
+ state = std::make_shared<MockRuntimeState>();
+ state->batsh_size = 10;
+ op->_child = mock_op;
+ }
+
+ void create_op(DataTypes input_types, DataTypes output_types) {
+ op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts(input_types);
+
+ op->_output_tuple_id = 0;
+ output_desc_tbl = std::make_unique<MockDescriptorTbl>(output_types,
&pool);
+ state->set_desc_tbl(output_desc_tbl.get());
+
+ op->init_make_nullable(state.get());
+
+ create_local_state();
+ }
+
+ void create_local_state() {
+ local_state_uptr =
std::make_unique<DistinctStreamingAggLocalState>(state.get(), op.get());
+ local_state = local_state_uptr.get();
+ LocalStateInfo info {.parent_profile = &profile,
+ .scan_ranges = {},
+ .shared_state = nullptr,
+ .le_state_map = {},
+ .task_idx = 0};
+ EXPECT_TRUE(local_state->init(state.get(), info));
+ state->resize_op_id_to_local_state(-100);
+ state->emplace_local_state(op->operator_id(),
std::move(local_state_uptr));
+ EXPECT_TRUE(local_state->open(state.get()));
+ }
+
+ RuntimeProfile profile {"test"};
+ std::unique_ptr<DistinctStreamingAggOperatorX> op;
+ std::unique_ptr<MockDescriptorTbl> output_desc_tbl;
+ std::shared_ptr<MockOperatorX> mock_op;
+
+ std::unique_ptr<DistinctStreamingAggLocalState> local_state_uptr;
+
+ DistinctStreamingAggLocalState* local_state;
+
+ std::shared_ptr<MockRuntimeState> state;
+ ObjectPool pool;
+};
+
+TEST_F(DistinctStreamingAggOperatorTest, test1) {
+ op->_is_streaming_preagg = false;
+
+ create_op({std::make_shared<DataTypeInt64>()},
{std::make_shared<DataTypeInt64>()});
+
+ mock_op->_outout_blocks.push_back(
+ ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4}));
+
+ {
+ bool eos = false;
+ Block block;
+
+ auto st = op->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st) << st.msg();
+ EXPECT_TRUE(eos);
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3,
4})));
+ }
+}
+
+TEST_F(DistinctStreamingAggOperatorTest, test2) {
+ op->_is_streaming_preagg = false;
+ op->_limit = 3;
+ create_op({std::make_shared<DataTypeInt64>()},
{std::make_shared<DataTypeInt64>()});
+
+ mock_op->_outout_blocks.push_back(
+ ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 1, 2, 3, 4,
1, 2, 3, 4}));
+
+ {
+ bool eos = false;
+ Block block;
+
+ auto st = op->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st) << st.msg();
+ EXPECT_TRUE(eos);
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3})));
+ }
+}
+
+TEST_F(DistinctStreamingAggOperatorTest, test3) {
+ // batch size = 10
+ op->_is_streaming_preagg = true;
+
+ create_op({std::make_shared<DataTypeInt64>()},
{std::make_shared<DataTypeInt64>()});
+
+ {
+ auto block =
+ ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4});
+ EXPECT_TRUE(op->push(state.get(), &block, false));
+ EXPECT_EQ(local_state->_cache_block.rows(), 0);
+ EXPECT_EQ(local_state->_aggregated_block->rows(), 4);
+ EXPECT_TRUE(op->need_more_input_data(state.get()));
+ }
+
+ {
+ auto block = ColumnHelper::create_block<DataTypeInt64>({5, 6, 7, 8});
+ EXPECT_TRUE(op->push(state.get(), &block, false));
+ EXPECT_EQ(local_state->_cache_block.rows(), 0);
+ EXPECT_EQ(local_state->_aggregated_block->rows(), 8);
+ EXPECT_TRUE(op->need_more_input_data(state.get()));
+ }
+
+ {
+ auto block = ColumnHelper::create_block<DataTypeInt64>({9, 10, 11,
12});
+ EXPECT_TRUE(op->push(state.get(), &block, false));
+ EXPECT_EQ(local_state->_cache_block.rows(), 2);
+ EXPECT_EQ(local_state->_aggregated_block->rows(), 10);
+ EXPECT_FALSE(op->need_more_input_data(state.get()));
+ }
+
+ {
+ Block block;
+ bool eos = false;
+ EXPECT_TRUE(op->pull(state.get(), &block, &eos));
+ EXPECT_FALSE(eos);
+ EXPECT_EQ(local_state->_cache_block.rows(), 0);
+ EXPECT_EQ(local_state->_aggregated_block->rows(), 2);
+ }
+ {
+ local_state->_stop_emplace_flag = true;
+ auto block = ColumnHelper::create_block<DataTypeInt64>({13, 14, 15});
+ EXPECT_TRUE(op->push(state.get(), &block, false));
+ EXPECT_EQ(local_state->_cache_block.rows(), 0);
+ EXPECT_EQ(local_state->_aggregated_block->rows(), 5);
+ EXPECT_FALSE(op->need_more_input_data(state.get()));
+ }
+ {
+ Block block;
+ bool eos = false;
+ EXPECT_TRUE(op->pull(state.get(), &block, &eos));
+ EXPECT_FALSE(eos);
+ EXPECT_EQ(block.rows(), 5);
+ EXPECT_EQ(local_state->_cache_block.rows(), 0);
+ EXPECT_EQ(local_state->_aggregated_block->rows(), 0);
+ }
+ {
+ EXPECT_TRUE(op->need_more_input_data(state.get()));
+ local_state->_stop_emplace_flag = true;
+ auto block = ColumnHelper::create_block<DataTypeInt64>({13, 14, 15});
+ EXPECT_TRUE(op->push(state.get(), &block, false));
+ EXPECT_EQ(local_state->_cache_block.rows(), 0);
+ EXPECT_EQ(local_state->_aggregated_block->rows(), 3);
+ EXPECT_FALSE(op->need_more_input_data(state.get()));
+ }
+ {
+ Block block;
+ bool eos = false;
+ EXPECT_TRUE(op->pull(state.get(), &block, &eos));
+ EXPECT_FALSE(eos);
+ EXPECT_EQ(block.rows(), 3);
+ EXPECT_EQ(local_state->_cache_block.rows(), 0);
+ EXPECT_EQ(local_state->_aggregated_block->rows(), 0);
+ }
+ { EXPECT_TRUE(op->close(state.get())); }
+}
+
+} // namespace doris::pipeline
diff --git a/be/test/testutil/mock/mock_descriptors.h
b/be/test/testutil/mock/mock_descriptors.h
index ffb37edf424..198c821dbe9 100644
--- a/be/test/testutil/mock/mock_descriptors.h
+++ b/be/test/testutil/mock/mock_descriptors.h
@@ -73,6 +73,7 @@ public:
auto* tuple_desc = pool->add(new MockTupleDescriptor());
tuple_desc->Slots = slots;
tuple_descriptors.push_back(tuple_desc);
+ _tuple_desc_map[0] = tuple_desc;
}
MOCK_METHOD(std::vector<TupleDescriptor*>, get_tuple_descs, (), (const));
diff --git a/be/test/testutil/mock/mock_slot_ref.cpp
b/be/test/testutil/mock/mock_slot_ref.cpp
index 3b556b8ee5c..e758fb5e893 100644
--- a/be/test/testutil/mock/mock_slot_ref.cpp
+++ b/be/test/testutil/mock/mock_slot_ref.cpp
@@ -45,6 +45,17 @@ VExprContextSPtr MockSlotRef::create_mock_context(int
column_id, DataTypePtr dat
return ctx;
}
+VExprContextSPtrs MockSlotRef::create_mock_contexts(DataTypes data_types) {
+ VExprContextSPtrs ctxs;
+ for (int i = 0; i < data_types.size(); i++) {
+ auto ctx =
VExprContext::create_shared(std::make_shared<MockSlotRef>(i, data_types[i]));
+ ctx->_prepared = true;
+ ctx->_opened = true;
+ ctxs.push_back(ctx);
+ }
+ return ctxs;
+}
+
TEST(MockSlotRefTest, test) {
auto old_ctx =
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
diff --git a/be/test/testutil/mock/mock_slot_ref.h
b/be/test/testutil/mock/mock_slot_ref.h
index 8e47fca38bf..79df5b67d58 100644
--- a/be/test/testutil/mock/mock_slot_ref.h
+++ b/be/test/testutil/mock/mock_slot_ref.h
@@ -68,6 +68,8 @@ public:
static VExprContextSPtr create_mock_context(int column_id, DataTypePtr
data_type);
+ static VExprContextSPtrs create_mock_contexts(DataTypes data_types);
+
private:
const std::string _name = "MockSlotRef";
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]