This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 77d75e24418 [test](case) add some test case about partition sort hash
types (#50318)
77d75e24418 is described below
commit 77d75e244187713ff148f58638431d3449bc790c
Author: zhangstar333 <[email protected]>
AuthorDate: Wed Apr 30 14:17:04 2025 +0800
[test](case) add some test case about partition sort hash types (#50318)
### What problem does this PR solve?
add some test case about partition sort hash types
---
be/src/pipeline/common/partition_sort_utils.cpp | 5 -
be/src/pipeline/common/partition_sort_utils.h | 9 +-
be/src/pipeline/exec/analytic_sink_operator.cpp | 2 +-
.../pipeline/exec/partition_sort_sink_operator.h | 7 +-
be/test/pipeline/exec/data_queue_test.cpp | 8 +
.../operator/analytic_sink_operator_test.cpp | 182 +++++++++++++++++++
.../operator/partition_sort_sink_operator_test.cpp | 195 ++++++++++++++++++++-
.../query_p0/test_partition_sort_hash_types.out | Bin 0 -> 2361 bytes
.../query_p0/test_partition_sort_hash_types.groovy | 65 +++++++
9 files changed, 453 insertions(+), 20 deletions(-)
diff --git a/be/src/pipeline/common/partition_sort_utils.cpp
b/be/src/pipeline/common/partition_sort_utils.cpp
index c7436d38284..b6fdbd5915e 100644
--- a/be/src/pipeline/common/partition_sort_utils.cpp
+++ b/be/src/pipeline/common/partition_sort_utils.cpp
@@ -38,7 +38,6 @@ Status PartitionBlocks::append_block_by_selector(const
vectorized::Block* input_
}
_blocks.back()->set_columns(std::move(mutable_columns));
_init_rows = _init_rows - selector_rows;
- _total_rows = _total_rows + selector_rows;
_current_input_rows = _current_input_rows + selector_rows;
_selector.clear();
// maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD
@@ -48,7 +47,6 @@ Status PartitionBlocks::append_block_by_selector(const
vectorized::Block* input_
create_or_reset_sorter_state();
RETURN_IF_ERROR(do_partition_topn_sort());
_current_input_rows = 0; // reset record
- _do_partition_topn_count++;
}
}
return Status::OK();
@@ -79,7 +77,6 @@ Status PartitionBlocks::do_partition_topn_sort() {
_blocks.clear();
RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read());
bool current_eos = false;
- size_t current_output_rows = 0;
while (!current_eos) {
// output_block maybe need better way
auto output_block = vectorized::Block::create_unique(
@@ -88,12 +85,10 @@ Status PartitionBlocks::do_partition_topn_sort() {
output_block.get(),
¤t_eos));
auto rows = output_block->rows();
if (rows > 0) {
- current_output_rows += rows;
_blocks.emplace_back(std::move(output_block));
}
}
- _topn_filter_rows += (_current_input_rows - current_output_rows);
return Status::OK();
}
diff --git a/be/src/pipeline/common/partition_sort_utils.h
b/be/src/pipeline/common/partition_sort_utils.h
index d88d6ea9c85..381dd3ec42b 100644
--- a/be/src/pipeline/common/partition_sort_utils.h
+++ b/be/src/pipeline/common/partition_sort_utils.h
@@ -72,7 +72,7 @@ public:
static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 << 20;
#ifndef NDEBUG
-static constexpr size_t PARTITION_SORT_ROWS_THRESHOLD = 10;
+static constexpr size_t PARTITION_SORT_ROWS_THRESHOLD = 5;
#else
static constexpr size_t PARTITION_SORT_ROWS_THRESHOLD = 20000;
#endif
@@ -103,16 +103,9 @@ public:
return _init_rows <= 0 || _blocks.back()->bytes() >
INITIAL_BUFFERED_BLOCK_BYTES;
}
- size_t get_total_rows() const { return _total_rows; }
- size_t get_topn_filter_rows() const { return _topn_filter_rows; }
- size_t get_do_topn_count() const { return _do_partition_topn_count; }
-
vectorized::IColumn::Selector _selector;
std::vector<std::unique_ptr<vectorized::Block>> _blocks;
- size_t _total_rows = 0;
size_t _current_input_rows = 0;
- size_t _topn_filter_rows = 0;
- size_t _do_partition_topn_count = 0;
int64_t _init_rows = 4096;
bool _is_first_sorter = false;
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index f9d9f828c46..ecf6b86d7d3 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -160,8 +160,8 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) {
}
// only support one order by column, so need two columns upper and lower
bound
+ _range_between_expr_ctxs.resize(p._range_between_expr_ctxs.size());
_range_result_columns.resize(_range_between_expr_ctxs.size());
- _range_between_expr_ctxs = p._range_between_expr_ctxs;
for (size_t i = 0; i < _range_between_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._range_between_expr_ctxs[i]->clone(state,
_range_between_expr_ctxs[i]));
_range_result_columns[i] =
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 5096890ac01..aae733460c6 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -74,11 +74,14 @@ public:
const TPlanNode& tnode, const DescriptorTbl&
descs);
#ifdef BE_TEST
- PartitionSortSinkOperatorX(ObjectPool* pool, int limit, int
partition_exprs_num)
+ PartitionSortSinkOperatorX(ObjectPool* pool, int limit, int
partition_exprs_num,
+ bool has_global_limit, int
partition_inner_limit)
: _pool(pool),
_limit(limit),
_partition_exprs_num(partition_exprs_num),
- _topn_phase(TPartTopNPhase::ONE_PHASE_GLOBAL) {}
+ _topn_phase(TPartTopNPhase::ONE_PHASE_GLOBAL),
+ _has_global_limit(has_global_limit),
+ _partition_inner_limit(partition_inner_limit) {}
#endif
Status init(const TDataSink& tsink) override {
diff --git a/be/test/pipeline/exec/data_queue_test.cpp
b/be/test/pipeline/exec/data_queue_test.cpp
index d0751600e15..4b83bfbf84b 100644
--- a/be/test/pipeline/exec/data_queue_test.cpp
+++ b/be/test/pipeline/exec/data_queue_test.cpp
@@ -108,6 +108,14 @@ TEST_F(DataQueueTest, MultiTest) {
output1.join();
EXPECT_EQ(output_count, 150);
+ for (int i = 0; i < 3; i++) {
+ EXPECT_TRUE(data_queue->is_finish(i));
+ }
+ EXPECT_TRUE(data_queue->is_all_finish());
+ data_queue->clear_free_blocks();
+ for (int i = 0; i < 3; i++) {
+ EXPECT_TRUE(data_queue->_free_blocks[i].empty());
+ }
}
// ./run-be-ut.sh --run --filter=DataQueueTest.*
diff --git a/be/test/pipeline/operator/analytic_sink_operator_test.cpp
b/be/test/pipeline/operator/analytic_sink_operator_test.cpp
index 48e2a568b7a..ba80fa7ef0a 100644
--- a/be/test/pipeline/operator/analytic_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/analytic_sink_operator_test.cpp
@@ -633,4 +633,186 @@ TEST_F(AnalyticSinkOperatorTest, AggFunction6) {
std::cout << "######### AggFunction with row_number test end #########" <<
std::endl;
}
+TEST_F(AnalyticSinkOperatorTest, AggFunction7) {
+ int batch_size = 2;
+ Initialize(batch_size);
+ create_operator(true, 1, "sum", {std::make_shared<DataTypeInt64>()});
+ sink->_agg_expr_ctxs.resize(1);
+ sink->_agg_expr_ctxs[0] =
+ MockSlotRef::create_mock_contexts(0,
std::make_shared<DataTypeInt64>());
+ sink->_partition_by_eq_expr_ctxs =
+ MockSlotRef::create_mock_contexts(0,
std::make_shared<DataTypeInt64>());
+ sink->_order_by_eq_expr_ctxs =
+ MockSlotRef::create_mock_contexts(0,
std::make_shared<DataTypeInt64>());
+ TAnalyticWindow temp_window;
+ temp_window.type = TAnalyticWindowType::RANGE;
+ TAnalyticWindowBoundary window_end;
+ window_end.type = TAnalyticWindowBoundaryType::CURRENT_ROW;
+ temp_window.__set_window_end(window_end);
+ create_window_type(false, true, temp_window);
+ sink->_has_range_window = true;
+
+ create_local_state();
+
+ auto sink_data = [&](int row_count, bool eos) {
+ std::vector<int64_t> data_vals;
+ for (int i = 0; i < batch_size; i++) {
+ data_vals.push_back(row_count + i);
+ }
+ vectorized::Block block =
ColumnHelper::create_block<DataTypeInt64>(data_vals);
+ auto st = sink->sink(state.get(), &block, eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ };
+
+ {
+ int row_count = 0;
+ for (int i = 0; i < 5; i++) {
+ sink_data(row_count, i == 4);
+ row_count += batch_size;
+ }
+ }
+
+ auto compare_block_result = [&](int row_count, std::vector<int64_t>
data_vals,
+ std::vector<int64_t> expect_vals) {
+ std::vector<int64_t> expect_vals_tmp;
+ std::vector<int64_t> data_vals_tmp;
+ for (int i = 0; i < batch_size; i++) {
+ data_vals_tmp.push_back(data_vals[i + row_count]);
+ expect_vals_tmp.push_back(expect_vals[i + row_count]);
+ }
+ vectorized::Block block =
ColumnHelper::create_block<DataTypeInt64>({});
+ bool eos = false;
+ auto st = source->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ std::cout << "source get from block is: \n" << block.dump_data() <<
std::endl;
+ std::cout << "block for real result is: \n "
+ << ColumnHelper::create_block<DataTypeInt64>(data_vals_tmp,
expect_vals_tmp)
+ .dump_data()
+ << std::endl;
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block,
ColumnHelper::create_block<DataTypeInt64>(data_vals_tmp, expect_vals_tmp)));
+ };
+
+ {
+ int row_count = 0;
+ std::vector<int64_t> data_vals {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+ std::vector<int64_t> expect_vals {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; //sum
+ for (int i = 0; i < 5; i++) {
+ compare_block_result(row_count, data_vals, expect_vals);
+ row_count += batch_size;
+ }
+ vectorized::Block block2 =
ColumnHelper::create_block<DataTypeInt64>({});
+ bool eos2 = false;
+ auto st2 = source->get_block(state.get(), &block2, &eos2);
+ EXPECT_TRUE(st2.ok()) << st2.msg();
+ EXPECT_EQ(block2.rows(), 0);
+ EXPECT_TRUE(eos2);
+ }
+ Status exec_status = Status::OK();
+ auto st = sink_local_state->close(state.get(), exec_status);
+ EXPECT_TRUE(st.ok());
+ st = source_local_state->close(state.get());
+ EXPECT_TRUE(st.ok());
+ std::cout << "######### AggFunction with row_number test end #########" <<
std::endl;
+}
+
+// range between is not support by FE, shouldn't consider this test
+TEST_F(AnalyticSinkOperatorTest, AggFunction8) {
+ int batch_size = 1;
+ Initialize(batch_size);
+ create_operator(true, 1, "sum", {std::make_shared<DataTypeInt64>()});
+ sink->_agg_expr_ctxs.resize(1);
+ sink->_agg_expr_ctxs[0] =
+ MockSlotRef::create_mock_contexts(2,
std::make_shared<DataTypeInt64>());
+ sink->_partition_by_eq_expr_ctxs =
+ MockSlotRef::create_mock_contexts(0,
std::make_shared<DataTypeInt64>());
+ sink->_order_by_eq_expr_ctxs =
+ MockSlotRef::create_mock_contexts(1,
std::make_shared<DataTypeInt64>());
+ sink->_range_between_expr_ctxs = MockSlotRef::create_mock_contexts(
+ {std::make_shared<DataTypeInt64>(),
std::make_shared<DataTypeInt64>()});
+ TAnalyticWindow temp_window;
+ temp_window.type = TAnalyticWindowType::RANGE;
+ TAnalyticWindowBoundary window_end;
+ window_end.type = TAnalyticWindowBoundaryType::CURRENT_ROW;
+ temp_window.__set_window_end(window_end);
+ create_window_type(true, true, temp_window);
+ sink->_has_range_window = true;
+ create_local_state();
+ // test with row_number agg function and has window:
_get_next_for_unbounded_rows
+ std::vector<int64_t> suppkey = {5, 5, 17, 17, 26, 26, 32, 32, 36, 36,
+ 40, 40, 41, 41, 51, 51, 87, 87, 93, 93};
+ std::vector<int64_t> orderkey = {5, 5, 7, 7, 7, 7, 7, 7, 6, 6, 7, 7, 5, 5,
7, 7, 5, 5, 7, 7};
+ std::vector<int64_t> quantity = {50, 50, 46, 46, 35, 35, 28, 28, 37, 37,
+ 38, 38, 26, 26, 12, 12, 15, 15, 9, 9};
+ std::vector<int64_t> first_value_quantity_A = {100, 100, 0, 92, 35, 35,
28, 28, 37, 37,
+ 38, 38, 26, 26, 12, 12,
15, 15, 9, 9};
+
+ auto sink_data = [&](int row_count, bool eos) {
+ std::vector<int64_t> col1, col2, col3;
+ for (int i = 0; i < batch_size; i++) {
+ col1.push_back(suppkey[row_count + i]);
+ col2.push_back(orderkey[row_count + i]);
+ col3.push_back(quantity[row_count + i]);
+ }
+ vectorized::Block block;
+
+
block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>(col1));
+
block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>(col2));
+
block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>(col3));
+ auto st = sink->sink(state.get(), &block, eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ };
+
+ {
+ int row_count = 0;
+ for (int i = 0; i < 5; i++) {
+ sink_data(row_count, i == 4);
+ row_count += batch_size;
+ }
+ }
+
+ auto compare_block_result = [&](int row_count) {
+ std::vector<int64_t> col1, col2, col3, expect_vals;
+ for (int i = 0; i < batch_size; i++) {
+ col1.push_back(suppkey[row_count + i]);
+ col2.push_back(orderkey[row_count + i]);
+ col3.push_back(quantity[row_count + i]);
+ expect_vals.push_back(first_value_quantity_A[row_count + i]);
+ }
+
+ vectorized::Block block;
+ block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({}));
+ block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({}));
+ block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({}));
+ block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({}));
+ bool eos = false;
+ auto st = source->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ std::cout << "source get from block is: \n" << block.dump_data() <<
std::endl;
+
+ vectorized::Block result_block;
+
result_block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>(col1));
+
result_block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>(col2));
+
result_block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>(col3));
+
result_block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>(expect_vals));
+ std::cout << "block for real result is: \n " <<
result_block.dump_data() << std::endl;
+ EXPECT_TRUE(ColumnHelper::block_equal(block, result_block));
+ };
+
+ {
+ int row_count = 0;
+ for (int i = 0; i < 5; i++) {
+ compare_block_result(row_count);
+ row_count += batch_size;
+ }
+ vectorized::Block block2 =
ColumnHelper::create_block<DataTypeInt64>({});
+ bool eos2 = false;
+ auto st2 = source->get_block(state.get(), &block2, &eos2);
+ EXPECT_TRUE(st2.ok()) << st2.msg();
+ EXPECT_EQ(block2.rows(), 0);
+ EXPECT_TRUE(eos2);
+ }
+ std::cout << "######### AggFunction with row_number test end #########" <<
std::endl;
+}
+
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/partition_sort_sink_operator_test.cpp
b/be/test/pipeline/operator/partition_sort_sink_operator_test.cpp
index e55e3eab4a7..f01d6194710 100644
--- a/be/test/pipeline/operator/partition_sort_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/partition_sort_sink_operator_test.cpp
@@ -88,9 +88,11 @@ struct PartitionSortOperatorTest : public ::testing::Test {
return true;
}
- void test_for_sink_and_source() {
+ void test_for_sink_and_source(int partition_exprs_num = 1, bool
has_global_limit = false,
+ int partition_inner_limit = 0) {
SetUp();
- sink = std::make_unique<PartitionSortSinkOperatorX>(&pool, -1, 1);
+ sink = std::make_unique<PartitionSortSinkOperatorX>(
+ &pool, -1, partition_exprs_num, has_global_limit,
partition_inner_limit);
sink->_is_asc_order = {true};
sink->_nulls_first = {false};
@@ -102,8 +104,10 @@ struct PartitionSortOperatorTest : public ::testing::Test {
sink->_vsort_exec_exprs._ordering_expr_ctxs =
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
- sink->_partition_expr_ctxs =
-
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
+ if (partition_exprs_num > 0) {
+ sink->_partition_expr_ctxs =
+ MockSlotRef::create_mock_contexts(0,
std::make_shared<DataTypeInt64>());
+ }
_child_op->_mock_row_desc.reset(
new MockRowDescriptor
{{std::make_shared<vectorized::DataTypeInt64>()}, &pool});
@@ -143,7 +147,43 @@ struct PartitionSortOperatorTest : public ::testing::Test {
{ EXPECT_TRUE(sink_local_state->open(state.get()).ok()); }
{ EXPECT_TRUE(source_local_state->open(state.get()).ok()); }
+ }
+
+ void test_partition_sort(int partition_exprs_num, int topn_num) {
+ std::vector<int64_t> data_val1;
+ for (int j = 0; j < 5; j++) {
+ for (int i = 0; i < 5; i++) {
+ data_val1.push_back(i + 666);
+ }
+ }
+
+ std::vector<int64_t> data_val2;
+ for (int i = 0; i < 6; i++) {
+ data_val2.push_back(i + 666);
+ }
+ vectorized::Block block =
ColumnHelper::create_block<DataTypeInt64>(data_val1);
+ EXPECT_TRUE(sink->sink(state.get(), &block, false));
+ vectorized::Block block2 =
ColumnHelper::create_block<DataTypeInt64>(data_val2);
+ EXPECT_TRUE(sink->sink(state.get(), &block2, true));
+ bool eos = false;
+ Block output_block;
+ EXPECT_TRUE(source->get_block(state.get(), &output_block, &eos).ok());
+
+ if (partition_exprs_num == 0) {
+ std::vector<int64_t> expect_vals;
+ for (int i = 0; i < topn_num; i++) {
+ expect_vals.push_back(data_val1[0]);
+ }
+ vectorized::Block result_block =
ColumnHelper::create_block<DataTypeInt64>(expect_vals);
+ EXPECT_TRUE(ColumnHelper::block_equal(result_block, output_block));
+ EXPECT_EQ(output_block.rows(), topn_num);
+ } else {
+ EXPECT_EQ(output_block.rows(), topn_num);
+ }
+ std::cout << "source get block: \n" << output_block.dump_data() <<
std::endl;
+ }
+ void test_thread_mutex() {
auto sink_func = [&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3,
4});
@@ -174,7 +214,154 @@ struct PartitionSortOperatorTest : public ::testing::Test
{
TEST_F(PartitionSortOperatorTest, test) {
for (int i = 0; i < 100; i++) {
test_for_sink_and_source();
+ test_thread_mutex();
}
}
+TEST_F(PartitionSortOperatorTest, test_no_partition) {
+ int partition_exprs_num = 0;
+ int topn_num = 3;
+ test_for_sink_and_source(partition_exprs_num, true, 3);
+ test_partition_sort(partition_exprs_num, topn_num);
+}
+
+TEST_F(PartitionSortOperatorTest, test_one_partition) {
+ int partition_exprs_num = 1;
+ int topn_num = 3;
+ test_for_sink_and_source(partition_exprs_num, true, 3);
+ test_partition_sort(partition_exprs_num, topn_num);
+}
+
+TEST_F(PartitionSortOperatorTest, TestWithoutKey) {
+ std::vector<vectorized::DataTypePtr> types
{std::make_shared<vectorized::DataTypeUInt32>()};
+ std::unique_ptr<PartitionedHashMapVariants> _variants =
+ std::make_unique<PartitionedHashMapVariants>();
+ _variants->init(types, HashKeyType::without_key);
+
ASSERT_TRUE(std::holds_alternative<std::monostate>(_variants->method_variant));
+}
+
+TEST_F(PartitionSortOperatorTest, TestSerializedKey) {
+ std::vector<vectorized::DataTypePtr> types
{std::make_shared<vectorized::DataTypeString>()};
+ std::unique_ptr<PartitionedHashMapVariants> _variants =
+ std::make_unique<PartitionedHashMapVariants>();
+ _variants->init(types, HashKeyType::serialized);
+
ASSERT_TRUE(std::holds_alternative<vectorized::MethodSerialized<PartitionDataWithStringKey>>(
+ _variants->method_variant));
+}
+
+TEST_F(PartitionSortOperatorTest, TestNumericKeys) {
+ std::vector<vectorized::DataTypePtr> types
{std::make_shared<vectorized::DataTypeUInt32>()};
+ std::unique_ptr<PartitionedHashMapVariants> _variants =
+ std::make_unique<PartitionedHashMapVariants>();
+ // Test int8 key
+ _variants->init(types, HashKeyType::int8_key);
+ auto value = std::holds_alternative<
+ vectorized::MethodOneNumber<vectorized::UInt8,
PartitionData<vectorized::UInt8>>>(
+ _variants->method_variant);
+ ASSERT_TRUE(value);
+
+ // Test int16 key
+ _variants->init(types, HashKeyType::int16_key);
+ value = std::holds_alternative<
+ vectorized::MethodOneNumber<vectorized::UInt16,
PartitionData<vectorized::UInt16>>>(
+ _variants->method_variant);
+ ASSERT_TRUE(value);
+
+ // Test int32 key
+ _variants->init(types, HashKeyType::int32_key);
+ value = std::holds_alternative<
+ vectorized::MethodOneNumber<vectorized::UInt32,
PartitionData<vectorized::UInt32>>>(
+ _variants->method_variant);
+ ASSERT_TRUE(value);
+
+ // Test int64 key
+ _variants->init(types, HashKeyType::int64_key);
+ value = std::holds_alternative<
+ vectorized::MethodOneNumber<vectorized::UInt64,
PartitionData<vectorized::UInt64>>>(
+ _variants->method_variant);
+ ASSERT_TRUE(value);
+
+ // Test int128 key
+ _variants->init(types, HashKeyType::int128_key);
+ value = std::holds_alternative<
+ vectorized::MethodOneNumber<vectorized::UInt128,
PartitionData<vectorized::UInt128>>>(
+ _variants->method_variant);
+ ASSERT_TRUE(value);
+
+ // Test int256 key
+ _variants->init(types, HashKeyType::int256_key);
+ value = std::holds_alternative<
+ vectorized::MethodOneNumber<vectorized::UInt256,
PartitionData<vectorized::UInt256>>>(
+ _variants->method_variant);
+ ASSERT_TRUE(value);
+}
+
+TEST_F(PartitionSortOperatorTest, TestNullableKeys) {
+ auto nullable_type = std::make_shared<vectorized::DataTypeNullable>(
+ std::make_shared<vectorized::DataTypeUInt32>());
+ std::vector<vectorized::DataTypePtr> types {nullable_type};
+ std::unique_ptr<PartitionedHashMapVariants> _variants =
+ std::make_unique<PartitionedHashMapVariants>();
+ // Test nullable int32
+ _variants->init(types, HashKeyType::int32_key);
+ auto value = std::holds_alternative<
+ vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+ vectorized::UInt32,
DataWithNullKey<PartitionData<vectorized::UInt32>>>>>(
+ _variants->method_variant);
+ ASSERT_TRUE(value);
+
+ // Test nullable string
+ _variants->init(types, HashKeyType::string_key);
+ auto value2 =
std::holds_alternative<vectorized::MethodSingleNullableColumn<
+
vectorized::MethodStringNoCache<DataWithNullKey<PartitionDataWithShortStringKey>>>>(
+ _variants->method_variant);
+ ASSERT_TRUE(value2);
+
+ // Test not nullable string
+ auto string_type = std::make_shared<vectorized::DataTypeString>();
+ std::vector<vectorized::DataTypePtr> types2 {string_type};
+ _variants->init(types2, HashKeyType::string_key);
+ auto value3 = std::holds_alternative<
+ vectorized::MethodStringNoCache<PartitionDataWithShortStringKey>>(
+ _variants->method_variant);
+ ASSERT_TRUE(value3);
+}
+
+TEST_F(PartitionSortOperatorTest, TestFixedKeys) {
+ std::vector<vectorized::DataTypePtr> types
{std::make_shared<vectorized::DataTypeUInt32>(),
+
std::make_shared<vectorized::DataTypeUInt32>()};
+ std::unique_ptr<PartitionedHashMapVariants> _variants =
+ std::make_unique<PartitionedHashMapVariants>();
+ // Test fixed64
+ _variants->init(types, HashKeyType::fixed64);
+ ASSERT_TRUE(
+
std::holds_alternative<vectorized::MethodKeysFixed<PartitionData<vectorized::UInt64>>>(
+ _variants->method_variant));
+
+ // Test fixed128
+ _variants->init(types, HashKeyType::fixed128);
+ ASSERT_TRUE(
+
std::holds_alternative<vectorized::MethodKeysFixed<PartitionData<vectorized::UInt128>>>(
+ _variants->method_variant));
+
+ // Test fixed136
+ _variants->init(types, HashKeyType::fixed136);
+ ASSERT_TRUE(
+
std::holds_alternative<vectorized::MethodKeysFixed<PartitionData<vectorized::UInt136>>>(
+ _variants->method_variant));
+
+ // Test fixed256
+ _variants->init(types, HashKeyType::fixed256);
+ ASSERT_TRUE(
+
std::holds_alternative<vectorized::MethodKeysFixed<PartitionData<vectorized::UInt256>>>(
+ _variants->method_variant));
+}
+
+TEST_F(PartitionSortOperatorTest, TestInvalidKeyType) {
+ std::vector<vectorized::DataTypePtr> types
{std::make_shared<vectorized::DataTypeUInt32>()};
+ std::unique_ptr<PartitionedHashMapVariants> _variants =
+ std::make_unique<PartitionedHashMapVariants>();
+ ASSERT_THROW(_variants->init(types, static_cast<HashKeyType>(-1)),
Exception);
+}
+
} // namespace doris::pipeline
diff --git a/regression-test/data/query_p0/test_partition_sort_hash_types.out
b/regression-test/data/query_p0/test_partition_sort_hash_types.out
new file mode 100644
index 00000000000..d5033a0a26c
Binary files /dev/null and
b/regression-test/data/query_p0/test_partition_sort_hash_types.out differ
diff --git
a/regression-test/suites/query_p0/test_partition_sort_hash_types.groovy
b/regression-test/suites/query_p0/test_partition_sort_hash_types.groovy
new file mode 100644
index 00000000000..80962cc32cc
--- /dev/null
+++ b/regression-test/suites/query_p0/test_partition_sort_hash_types.groovy
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_partition_sort_hash_types") {
+ def dbName = "test_partition_sort_hash_types_db"
+ sql "DROP DATABASE IF EXISTS ${dbName}"
+ sql "CREATE DATABASE ${dbName}"
+ sql "USE $dbName"
+ sql "set enable_partition_topn = true;"
+ sql "set enable_decimal256 = true;"
+
+ sql "DROP TABLE IF EXISTS test_partition_sort_hash_types"
+ sql """
+ CREATE TABLE IF NOT EXISTS `test_partition_sort_hash_types` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(40, 6) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+
+ streamLoad {
+ table "test_partition_sort_hash_types"
+ db dbName
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+
+ sql "sync"
+ sql """ delete from test_partition_sort_hash_types where k1 is null; """
+ qt_select_0 """ select * from test_partition_sort_hash_types order by 2;"""
+
+ qt_select_1 """ select * from (select row_number() over(partition by k1
order by k1) as row_num from test_partition_sort_hash_types)t where row_num =
1; """
+ qt_select_2 """ select * from (select row_number() over(partition by k4
order by k1) as row_num from test_partition_sort_hash_types)t where row_num =
1; """
+ qt_select_3 """ select * from (select row_number() over(partition by k13
order by k1) as row_num from test_partition_sort_hash_types)t where row_num =
1; """
+ qt_select_4 """ select * from (select row_number() over(partition by k5
order by k1) as row_num from test_partition_sort_hash_types)t where row_num =
1; """
+ qt_select_5 """ select * from (select row_number() over(partition by
non_nullable(k12) order by k1) as row_num from test_partition_sort_hash_types)t
where row_num = 1; """
+ qt_select_6 """ select * from (select row_number() over(partition by k2,k3
order by k1) as row_num from test_partition_sort_hash_types)t where row_num =
1; """
+ qt_select_7 """ select * from (select row_number() over(partition by
non_nullable(k0),non_nullable(k13) order by k1) as row_num from
test_partition_sort_hash_types)t where row_num = 1; """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]