This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3370aa003ff [Opt](reservoir-sampler) Optimize percentile_reservoir 
batch aggregation (#64593)
3370aa003ff is described below

commit 3370aa003ff57897b888da77d4bec521a4564e6e
Author: linrrarity <[email protected]>
AuthorDate: Tue Jun 23 14:15:01 2026 +0800

    [Opt](reservoir-sampler) Optimize percentile_reservoir batch aggregation 
(#64593)
    
    Optimize `percentile_reservoir` aggregation performance by reducing
    per-row aggregate function overhead and using faster internal sorting
    for reservoir samples.
    
    before:
    ```sql
    Doris> select percentile_reservoir(FUniqID, 0.9999) from hits_100m;
    +---------------------------------------+
    | percentile_reservoir(FUniqID, 0.9999) |
    +---------------------------------------+
    |                 9.222511254540202e+18 |
    +---------------------------------------+
    1 row in set (1.292 sec)
    ```
    
    now:
    ```sql
    Doris> select percentile_reservoir(FUniqID, 0.9999) from hits_100m;
    +---------------------------------------+
    | percentile_reservoir(FUniqID, 0.9999) |
    +---------------------------------------+
    |                 9.222511254540202e+18 |
    +---------------------------------------+
    1 row in set (0.537 sec)
    ```
---
 .../aggregate_function_percentile_reservoir.h      | 43 +++++++++-
 be/src/util/reservoir_sampler.h                    | 10 ++-
 .../aggregate/agg_percentile_reservoir_test.cpp    | 96 ++++++++++++++++++++++
 .../test_aggregate_all_functions2.groovy           | 19 +++++
 4 files changed, 164 insertions(+), 4 deletions(-)

diff --git a/be/src/exprs/aggregate/aggregate_function_percentile_reservoir.h 
b/be/src/exprs/aggregate/aggregate_function_percentile_reservoir.h
index a396658f2a5..8ce001afc17 100644
--- a/be/src/exprs/aggregate/aggregate_function_percentile_reservoir.h
+++ b/be/src/exprs/aggregate/aggregate_function_percentile_reservoir.h
@@ -37,6 +37,11 @@ struct QuantileReservoirSampler {
         data.insert(x);
     }
 
+    void add_batch(const double* values, size_t size, const double 
input_level) {
+        this->level = input_level;
+        data.insert_many(values, size);
+    }
+
     void merge(const QuantileReservoirSampler& rhs) {
         level = rhs.level;
         data.merge(rhs.data);
@@ -84,11 +89,42 @@ public:
 
     void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
              Arena&) const override {
-        auto value = assert_cast<const 
ColumnFloat64&>(*columns[0]).get_data()[row_num];
-        auto level = assert_cast<const 
ColumnFloat64&>(*columns[1]).get_data()[0];
+        auto value = assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[0])
+                             .get_data()[row_num];
+        auto level = assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[1])
+                             .get_data()[0];
         this->data(place).add(value, level);
     }
 
+    void add_batch_single_place(size_t batch_size, AggregateDataPtr place, 
const IColumn** columns,
+                                Arena&) const override {
+        const auto& sources =
+                assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[0]);
+        const auto& levels =
+                assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[1]);
+        this->data(place).add_batch(sources.get_data().data(), batch_size, 
levels.get_data()[0]);
+    }
+
+    void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
+                                int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
+                                Arena&, UInt8* use_null_result,
+                                UInt8* could_use_previous_result) const 
override {
+        frame_start = std::max<int64_t>(frame_start, partition_start);
+        frame_end = std::min<int64_t>(frame_end, partition_end);
+        if (frame_start < frame_end) {
+            const auto& sources =
+                    assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[0]);
+            const auto& levels =
+                    assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[1]);
+            this->data(place).add_batch(sources.get_data().data() + 
frame_start,
+                                        frame_end - frame_start, 
levels.get_data()[0]);
+            *use_null_result = false;
+            *could_use_previous_result = true;
+        } else if (!*could_use_previous_result) {
+            *use_null_result = true;
+        }
+    }
+
     void reset(AggregateDataPtr place) const override { 
this->data(place).reset(); }
 
     void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
@@ -106,7 +142,8 @@ public:
     }
 
     void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
-        
assert_cast<ColumnFloat64&>(to).get_data().push_back(this->data(place).get());
+        assert_cast<ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+                this->data(place).get());
     }
 };
 
diff --git a/be/src/util/reservoir_sampler.h b/be/src/util/reservoir_sampler.h
index 7eab494bf36..a54a2cf89b0 100644
--- a/be/src/util/reservoir_sampler.h
+++ b/be/src/util/reservoir_sampler.h
@@ -20,6 +20,8 @@
 
 #pragma once
 
+#include <pdqsort.h>
+
 #include <cmath>
 #include <cstddef>
 #include <functional>
@@ -314,6 +316,12 @@ public:
         }
     }
 
+    void insert_many(const double* values, size_t size) {
+        for (size_t i = 0; i < size; ++i) {
+            insert(values[i]);
+        }
+    }
+
     void clear() {
         samples.clear();
         sorted = false;
@@ -446,7 +454,7 @@ private:
             return;
         }
         sorted = true;
-        std::sort(samples.begin(), samples.end(), std::less<double>());
+        pdqsort(samples.begin(), samples.end(), std::less<double>());
     }
 };
 
diff --git a/be/test/exprs/aggregate/agg_percentile_reservoir_test.cpp 
b/be/test/exprs/aggregate/agg_percentile_reservoir_test.cpp
new file mode 100644
index 00000000000..594b136d192
--- /dev/null
+++ b/be/test/exprs/aggregate/agg_percentile_reservoir_test.cpp
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "core/block/column_with_type_and_name.h"
+#include "core/column/column_const.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_number.h"
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+
+namespace doris {
+
+namespace {
+
+AggregateFunctionPtr create_percentile_reservoir_function(bool is_window = 
false) {
+    return AggregateFunctionSimpleFactory::instance().get(
+            "percentile_reservoir",
+            {std::make_shared<DataTypeFloat64>(), 
std::make_shared<DataTypeFloat64>()},
+            std::make_shared<DataTypeFloat64>(), false, 
BeExecVersionManager::get_newest_version(),
+            {.is_window_function = is_window, .column_names = {}});
+}
+
+ColumnWithTypeAndName create_value_block(const std::vector<double>& values) {
+    auto value_column = ColumnFloat64::create();
+    for (double value : values) {
+        value_column->insert_value(value);
+    }
+    return {std::move(value_column), std::make_shared<DataTypeFloat64>(), 
"value"};
+}
+
+double read_result(AggregateFunctionPtr fn, AggregateDataPtr place) {
+    auto result_column = ColumnFloat64::create();
+    fn->insert_result_into(place, *result_column);
+    return result_column->get_element(0);
+}
+
+} // namespace
+
+TEST(AggregateFunctionPercentileReservoirTest, optimized_single_place_paths) {
+    auto fn = create_percentile_reservoir_function();
+    ASSERT_TRUE(fn != nullptr);
+
+    std::vector<ColumnWithTypeAndName> arguments;
+    arguments.emplace_back(create_value_block({1.0, 2.0, 3.0, 4.0}));
+    arguments.emplace_back(create_value_block({0.5, 0.5, 0.5, 0.5}));
+
+    Arena arena;
+    std::unique_ptr<char[]> place_mem(new char[fn->size_of_data()]);
+    AggregateDataPtr place = place_mem.get();
+    fn->create(place);
+
+    const IColumn* columns[] = {arguments[0].column.get(), 
arguments[1].column.get()};
+
+    fn->add_batch_single_place(4, place, columns, arena);
+    EXPECT_DOUBLE_EQ(read_result(fn, place), 2.5);
+
+    fn->reset(place);
+
+    UInt8 use_null_result = false;
+    UInt8 could_use_previous_result = false;
+    fn->add_range_single_place(0, 4, 1, 3, place, columns, arena, 
&use_null_result,
+                               &could_use_previous_result);
+    EXPECT_FALSE(use_null_result);
+    EXPECT_TRUE(could_use_previous_result);
+    EXPECT_DOUBLE_EQ(read_result(fn, place), 2.5);
+
+    fn->reset(place);
+    use_null_result = false;
+    could_use_previous_result = false;
+    fn->add_range_single_place(0, 4, 4, 4, place, columns, arena, 
&use_null_result,
+                               &could_use_previous_result);
+    EXPECT_TRUE(use_null_result);
+    EXPECT_FALSE(could_use_previous_result);
+
+    fn->destroy(place);
+}
+
+} // namespace doris
diff --git 
a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
 
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
index d150b976d03..bd26b8cf2e0 100644
--- 
a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
+++ 
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
@@ -90,6 +90,25 @@ suite("test_aggregate_all_functions2") {
     qt_select_percentile_reservoir4 """ select percentile_reservoir(k8,0.99) 
from baseall group by k6 order by 1; """ 
     qt_select_percentile_reservoir4 """ select percentile_reservoir(k1,1) from 
baseall; """ 
     qt_select_percentile_reservoir5 """ select percentile_reservoir(k1,0.5) 
over(partition by k6) from baseall order by k1; """
+    sql """
+        select percentile_reservoir(cast(number as double), 0.5)
+        from numbers("number" = "200000")
+        group by number % 16
+    """
+    sql """
+        select percentile_reservoir(cast(number as double), 0.5)
+        from numbers("number" = "200000")
+        group by number - number
+        limit 1
+    """
+    sql """
+        select percentile_reservoir(cast(number as double), 0.5) over (
+            partition by number % 2
+            order by number
+            rows between 1 following and 1 following)
+        from numbers("number" = "4096")
+        order by number
+    """
     sql """
         select percentile_reservoir(number,0.5) from numbers("number" = 
"1000000");
     """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to