This is an automated email from the ASF dual-hosted git repository.
HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3370aa003ff [Opt](reservoir-sampler) Optimize percentile_reservoir
batch aggregation (#64593)
3370aa003ff is described below
commit 3370aa003ff57897b888da77d4bec521a4564e6e
Author: linrrarity <[email protected]>
AuthorDate: Tue Jun 23 14:15:01 2026 +0800
[Opt](reservoir-sampler) Optimize percentile_reservoir batch aggregation
(#64593)
Optimize `percentile_reservoir` aggregation performance by reducing
per-row aggregate function overhead and using faster internal sorting
for reservoir samples.
before:
```sql
Doris> select percentile_reservoir(FUniqID, 0.9999) from hits_100m;
+---------------------------------------+
| percentile_reservoir(FUniqID, 0.9999) |
+---------------------------------------+
| 9.222511254540202e+18 |
+---------------------------------------+
1 row in set (1.292 sec)
```
now:
```sql
Doris> select percentile_reservoir(FUniqID, 0.9999) from hits_100m;
+---------------------------------------+
| percentile_reservoir(FUniqID, 0.9999) |
+---------------------------------------+
| 9.222511254540202e+18 |
+---------------------------------------+
1 row in set (0.537 sec)
```
---
.../aggregate_function_percentile_reservoir.h | 43 +++++++++-
be/src/util/reservoir_sampler.h | 10 ++-
.../aggregate/agg_percentile_reservoir_test.cpp | 96 ++++++++++++++++++++++
.../test_aggregate_all_functions2.groovy | 19 +++++
4 files changed, 164 insertions(+), 4 deletions(-)
diff --git a/be/src/exprs/aggregate/aggregate_function_percentile_reservoir.h
b/be/src/exprs/aggregate/aggregate_function_percentile_reservoir.h
index a396658f2a5..8ce001afc17 100644
--- a/be/src/exprs/aggregate/aggregate_function_percentile_reservoir.h
+++ b/be/src/exprs/aggregate/aggregate_function_percentile_reservoir.h
@@ -37,6 +37,11 @@ struct QuantileReservoirSampler {
data.insert(x);
}
+ void add_batch(const double* values, size_t size, const double
input_level) {
+ this->level = input_level;
+ data.insert_many(values, size);
+ }
+
void merge(const QuantileReservoirSampler& rhs) {
level = rhs.level;
data.merge(rhs.data);
@@ -84,11 +89,42 @@ public:
void add(AggregateDataPtr __restrict place, const IColumn** columns,
ssize_t row_num,
Arena&) const override {
- auto value = assert_cast<const
ColumnFloat64&>(*columns[0]).get_data()[row_num];
- auto level = assert_cast<const
ColumnFloat64&>(*columns[1]).get_data()[0];
+ auto value = assert_cast<const ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(*columns[0])
+ .get_data()[row_num];
+ auto level = assert_cast<const ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(*columns[1])
+ .get_data()[0];
this->data(place).add(value, level);
}
+ void add_batch_single_place(size_t batch_size, AggregateDataPtr place,
const IColumn** columns,
+ Arena&) const override {
+ const auto& sources =
+ assert_cast<const ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(*columns[0]);
+ const auto& levels =
+ assert_cast<const ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(*columns[1]);
+ this->data(place).add_batch(sources.get_data().data(), batch_size,
levels.get_data()[0]);
+ }
+
+ void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
+ int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
+ Arena&, UInt8* use_null_result,
+ UInt8* could_use_previous_result) const
override {
+ frame_start = std::max<int64_t>(frame_start, partition_start);
+ frame_end = std::min<int64_t>(frame_end, partition_end);
+ if (frame_start < frame_end) {
+ const auto& sources =
+ assert_cast<const ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(*columns[0]);
+ const auto& levels =
+ assert_cast<const ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(*columns[1]);
+ this->data(place).add_batch(sources.get_data().data() +
frame_start,
+ frame_end - frame_start,
levels.get_data()[0]);
+ *use_null_result = false;
+ *could_use_previous_result = true;
+ } else if (!*could_use_previous_result) {
+ *use_null_result = true;
+ }
+ }
+
void reset(AggregateDataPtr place) const override {
this->data(place).reset(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
@@ -106,7 +142,8 @@ public:
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
-
assert_cast<ColumnFloat64&>(to).get_data().push_back(this->data(place).get());
+ assert_cast<ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+ this->data(place).get());
}
};
diff --git a/be/src/util/reservoir_sampler.h b/be/src/util/reservoir_sampler.h
index 7eab494bf36..a54a2cf89b0 100644
--- a/be/src/util/reservoir_sampler.h
+++ b/be/src/util/reservoir_sampler.h
@@ -20,6 +20,8 @@
#pragma once
+#include <pdqsort.h>
+
#include <cmath>
#include <cstddef>
#include <functional>
@@ -314,6 +316,12 @@ public:
}
}
+ void insert_many(const double* values, size_t size) {
+ for (size_t i = 0; i < size; ++i) {
+ insert(values[i]);
+ }
+ }
+
void clear() {
samples.clear();
sorted = false;
@@ -446,7 +454,7 @@ private:
return;
}
sorted = true;
- std::sort(samples.begin(), samples.end(), std::less<double>());
+ pdqsort(samples.begin(), samples.end(), std::less<double>());
}
};
diff --git a/be/test/exprs/aggregate/agg_percentile_reservoir_test.cpp
b/be/test/exprs/aggregate/agg_percentile_reservoir_test.cpp
new file mode 100644
index 00000000000..594b136d192
--- /dev/null
+++ b/be/test/exprs/aggregate/agg_percentile_reservoir_test.cpp
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "core/block/column_with_type_and_name.h"
+#include "core/column/column_const.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_number.h"
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+
+namespace doris {
+
+namespace {
+
+AggregateFunctionPtr create_percentile_reservoir_function(bool is_window =
false) {
+ return AggregateFunctionSimpleFactory::instance().get(
+ "percentile_reservoir",
+ {std::make_shared<DataTypeFloat64>(),
std::make_shared<DataTypeFloat64>()},
+ std::make_shared<DataTypeFloat64>(), false,
BeExecVersionManager::get_newest_version(),
+ {.is_window_function = is_window, .column_names = {}});
+}
+
+ColumnWithTypeAndName create_value_block(const std::vector<double>& values) {
+ auto value_column = ColumnFloat64::create();
+ for (double value : values) {
+ value_column->insert_value(value);
+ }
+ return {std::move(value_column), std::make_shared<DataTypeFloat64>(),
"value"};
+}
+
+double read_result(AggregateFunctionPtr fn, AggregateDataPtr place) {
+ auto result_column = ColumnFloat64::create();
+ fn->insert_result_into(place, *result_column);
+ return result_column->get_element(0);
+}
+
+} // namespace
+
+TEST(AggregateFunctionPercentileReservoirTest, optimized_single_place_paths) {
+ auto fn = create_percentile_reservoir_function();
+ ASSERT_TRUE(fn != nullptr);
+
+ std::vector<ColumnWithTypeAndName> arguments;
+ arguments.emplace_back(create_value_block({1.0, 2.0, 3.0, 4.0}));
+ arguments.emplace_back(create_value_block({0.5, 0.5, 0.5, 0.5}));
+
+ Arena arena;
+ std::unique_ptr<char[]> place_mem(new char[fn->size_of_data()]);
+ AggregateDataPtr place = place_mem.get();
+ fn->create(place);
+
+ const IColumn* columns[] = {arguments[0].column.get(),
arguments[1].column.get()};
+
+ fn->add_batch_single_place(4, place, columns, arena);
+ EXPECT_DOUBLE_EQ(read_result(fn, place), 2.5);
+
+ fn->reset(place);
+
+ UInt8 use_null_result = false;
+ UInt8 could_use_previous_result = false;
+ fn->add_range_single_place(0, 4, 1, 3, place, columns, arena,
&use_null_result,
+ &could_use_previous_result);
+ EXPECT_FALSE(use_null_result);
+ EXPECT_TRUE(could_use_previous_result);
+ EXPECT_DOUBLE_EQ(read_result(fn, place), 2.5);
+
+ fn->reset(place);
+ use_null_result = false;
+ could_use_previous_result = false;
+ fn->add_range_single_place(0, 4, 4, 4, place, columns, arena,
&use_null_result,
+ &could_use_previous_result);
+ EXPECT_TRUE(use_null_result);
+ EXPECT_FALSE(could_use_previous_result);
+
+ fn->destroy(place);
+}
+
+} // namespace doris
diff --git
a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
index d150b976d03..bd26b8cf2e0 100644
---
a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
+++
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
@@ -90,6 +90,25 @@ suite("test_aggregate_all_functions2") {
qt_select_percentile_reservoir4 """ select percentile_reservoir(k8,0.99)
from baseall group by k6 order by 1; """
qt_select_percentile_reservoir4 """ select percentile_reservoir(k1,1) from
baseall; """
qt_select_percentile_reservoir5 """ select percentile_reservoir(k1,0.5)
over(partition by k6) from baseall order by k1; """
+ sql """
+ select percentile_reservoir(cast(number as double), 0.5)
+ from numbers("number" = "200000")
+ group by number % 16
+ """
+ sql """
+ select percentile_reservoir(cast(number as double), 0.5)
+ from numbers("number" = "200000")
+ group by number - number
+ limit 1
+ """
+ sql """
+ select percentile_reservoir(cast(number as double), 0.5) over (
+ partition by number % 2
+ order by number
+ rows between 1 following and 1 following)
+ from numbers("number" = "4096")
+ order by number
+ """
sql """
select percentile_reservoir(number,0.5) from numbers("number" =
"1000000");
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]