This is an automated email from the ASF dual-hosted git repository.
HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8f696e0ecaf [fix](be) Preserve collect aggregate limit during merge
(#63361)
8f696e0ecaf is described below
commit 8f696e0ecaf400ceb4fa0895f3059aa8b6b9a444
Author: Jerry Hu <[email protected]>
AuthorDate: Tue May 19 11:42:15 2026 +0800
[fix](be) Preserve collect aggregate limit during merge (#63361)
Problem Summary: collect_list and collect_set aggregate states with a
limit could lose their initialized max_size during merge because several
merge paths overwrote the left state max_size with the right state
unconditionally. If the right state was still uninitialized, later
merge/add operations could ignore the requested limit.
### Release note
None
### Check List (For Author)
- Test: Unit Test
- build-support/clang-format.sh
- build-support/check-format.sh
- ./run-be-ut.sh --run
--filter=VAggCollectTest.*:AggregateFunctionCollectTest.*
- build-support/run-clang-tidy.sh --build-dir be/ut_build_ASAN (failed:
local clang-tidy could not analyze due to missing stddef.h in the
toolchain include path and an existing unmatched NOLINTEND diagnostic in
be/src/core/types.h)
- Behavior changed: Yes. collect_list/collect_set merge preserves an
initialized max_size limit instead of overwriting it with an
uninitialized right-side state.
- Does this need documentation: No
---
.../exprs/aggregate/aggregate_function_collect.h | 9 +--
be/test/exprs/aggregate/agg_collect_test.cpp | 72 ++++++++++++++++++++--
2 files changed, 70 insertions(+), 11 deletions(-)
diff --git a/be/src/exprs/aggregate/aggregate_function_collect.h
b/be/src/exprs/aggregate/aggregate_function_collect.h
index b78e86b0fc4..cd787627ef9 100644
--- a/be/src/exprs/aggregate/aggregate_function_collect.h
+++ b/be/src/exprs/aggregate/aggregate_function_collect.h
@@ -17,11 +17,10 @@
#pragma once
-#include <assert.h>
#include <glog/logging.h>
-#include <string.h>
#include <cstddef>
+#include <cstring>
#include <limits>
#include <memory>
#include <new>
@@ -137,7 +136,6 @@ struct AggregateFunctionCollectSetData<T, HasLimit> {
if (max_size == -1) {
max_size = rhs.max_size;
}
- max_size = rhs.max_size;
for (const auto& rhs_elem : rhs.data_set) {
if constexpr (HasLimit) {
@@ -205,7 +203,6 @@ struct AggregateFunctionCollectListData {
if (max_size == -1) {
max_size = rhs.max_size;
}
- max_size = rhs.max_size;
for (auto& rhs_elem : rhs.data) {
if (size() >= max_size) {
return;
@@ -237,7 +234,7 @@ struct AggregateFunctionCollectListData {
auto& vec = assert_cast<ColVecType&>(to).get_data();
size_t old_size = vec.size();
vec.resize(old_size + size());
- memcpy(vec.data() + old_size, data.data(), size() *
sizeof(ElementType));
+ std::memcpy(vec.data() + old_size, data.data(), size() *
sizeof(ElementType));
}
};
@@ -263,7 +260,6 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
if (max_size == -1) {
max_size = rhs.max_size;
}
- max_size = rhs.max_size;
data->insert_range_from(*rhs.data, 0,
std::min(static_cast<size_t>(max_size -
size()), rhs.size()));
@@ -332,7 +328,6 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
if (max_size == -1) {
max_size = rhs.max_size;
}
- max_size = rhs.max_size;
column_data->insert_range_from(
*rhs.column_data, 0,
diff --git a/be/test/exprs/aggregate/agg_collect_test.cpp
b/be/test/exprs/aggregate/agg_collect_test.cpp
index 3fb0c8c020c..d5394ab86c4 100644
--- a/be/test/exprs/aggregate/agg_collect_test.cpp
+++ b/be/test/exprs/aggregate/agg_collect_test.cpp
@@ -17,9 +17,9 @@
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
-#include <stddef.h>
-#include <stdint.h>
+#include <cstddef>
+#include <cstdint>
#include <memory>
#include <ostream>
#include <string>
@@ -40,6 +40,7 @@
#include "core/types.h"
#include "exprs/aggregate/agg_function_test.h"
#include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_collect.h"
#include "exprs/aggregate/aggregate_function_simple_factory.h"
#include "gtest/gtest_pred_impl.h"
@@ -53,12 +54,12 @@ void
register_aggregate_function_collect_list(AggregateFunctionSimpleFactory& fa
class VAggCollectTest : public testing::Test {
public:
- void SetUp() {
+ void SetUp() override {
AggregateFunctionSimpleFactory factory =
AggregateFunctionSimpleFactory::instance();
register_aggregate_function_collect_list(factory);
}
- void TearDown() {}
+ void TearDown() override {}
bool is_distinct(const std::string& fn_name) { return fn_name ==
"collect_set"; }
@@ -217,6 +218,69 @@ TEST_F(VAggCollectTest, test_complex_data_type) {
test_agg_collect<DataTypeString>("array_agg", 5, true);
}
+TEST_F(VAggCollectTest, test_merge_preserve_initialized_max_size) {
+ {
+ const DataTypes argument_types {std::make_shared<DataTypeInt32>()};
+ AggregateFunctionCollectListData<TYPE_INT, true> lhs(argument_types);
+ AggregateFunctionCollectListData<TYPE_INT, true> rhs(argument_types);
+ lhs.max_size = 2;
+ lhs.data.push_back(1);
+ lhs.data.push_back(2);
+ rhs.data.push_back(3);
+ rhs.data.push_back(4);
+
+ lhs.merge(rhs);
+
+ EXPECT_EQ(lhs.max_size, 2);
+ EXPECT_EQ(lhs.size(), 2);
+ }
+
+ {
+ const DataTypes argument_types {std::make_shared<DataTypeString>()};
+ AggregateFunctionCollectSetData<TYPE_STRING, true> lhs(argument_types);
+ AggregateFunctionCollectSetData<TYPE_STRING, true> rhs(argument_types);
+ lhs.max_size = 1;
+ lhs.data_set.insert(StringRef("lhs", sizeof("lhs") - 1));
+ rhs.data_set.insert(StringRef("rhs", sizeof("rhs") - 1));
+ Arena arena;
+
+ lhs.merge(rhs, arena);
+
+ EXPECT_EQ(lhs.max_size, 1);
+ EXPECT_EQ(lhs.size(), 1);
+ }
+
+ {
+ const DataTypes argument_types {std::make_shared<DataTypeString>()};
+ AggregateFunctionCollectListData<TYPE_STRING, true>
lhs(argument_types);
+ AggregateFunctionCollectListData<TYPE_STRING, true>
rhs(argument_types);
+ lhs.max_size = 1;
+ lhs.data->insert_data("lhs", sizeof("lhs") - 1);
+ rhs.data->insert_data("rhs", sizeof("rhs") - 1);
+
+ lhs.merge(rhs);
+
+ EXPECT_EQ(lhs.max_size, 1);
+ EXPECT_EQ(lhs.size(), 1);
+ }
+
+ {
+ const DataTypePtr nested_type = std::make_shared<DataTypeInt32>();
+ const DataTypes argument_types {
+ std::make_shared<DataTypeArray>(make_nullable(nested_type))};
+ AggregateFunctionCollectListData<TYPE_ARRAY, true> lhs(argument_types);
+ AggregateFunctionCollectListData<TYPE_ARRAY, true> rhs(argument_types);
+ lhs.max_size = 1;
+ lhs.column_data->insert_default();
+ rhs.column_data->insert_default();
+
+ lhs.merge(rhs);
+
+ EXPECT_EQ(lhs.max_size, 1);
+ EXPECT_EQ(lhs.size(), 1);
+ }
+}
+
struct AggregateFunctionCollectTest : public AggregateFunctiontest {};
TEST_F(AggregateFunctionCollectTest, test_collect_list_aint64) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]