This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new ebd75c9d08e branch-4.1: Add datasketches HLL sketch aggregate 
functions #63143 (#63911)
ebd75c9d08e is described below

commit ebd75c9d08ef51397573bdafc916f0f867bb91b8
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jun 1 10:06:41 2026 +0800

    branch-4.1: Add datasketches HLL sketch aggregate functions #63143 (#63911)
    
    Cherry-picked from #63143
    
    Co-authored-by: nooneuse <[email protected]>
    Co-authored-by: yuanyuhao <[email protected]>
---
 .gitmodules                                        |    3 +
 ...gregate_function_datasketches_hll_union_agg.cpp |   44 +
 ...aggregate_function_datasketches_hll_union_agg.h |  243 +++++
 .../aggregate_function_simple_factory.cpp          |    3 +
 .../agg_datasketches_hll_union_agg_test.cpp        | 1097 ++++++++++++++++++++
 build.sh                                           |    9 +
 contrib/datasketches-cpp                           |    1 +
 .../doris/catalog/BuiltinAggregateFunctions.java   |    3 +
 .../functions/agg/DataSketchesHllUnionAgg.java     |  113 ++
 .../visitor/AggregateFunctionVisitor.java          |    5 +
 .../test_datasketches_hll_union_agg.out            |   28 +
 .../test_datasketches_hll_union_agg.groovy         |  170 +++
 run-be-ut.sh                                       |   14 +-
 13 files changed, 1732 insertions(+), 1 deletion(-)

diff --git a/.gitmodules b/.gitmodules
index 54c1a8a3636..eb8e703aa8a 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -29,3 +29,6 @@
        path = contrib/openblas
        url = https://github.com/apache/doris-thirdparty.git
        branch = openblas
+[submodule "contrib/datasketches-cpp"]
+       path = contrib/datasketches-cpp
+       url = https://github.com/apache/datasketches-cpp.git
diff --git 
a/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.cpp 
b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.cpp
new file mode 100644
index 00000000000..c9b7013e7a9
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.cpp
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h"
+
+#include <string>
+
+#include "core/data_type/data_type.h"
+#include "core/data_type/define_primitive_type.h"
+#include "exec/common/hash_table/hash.h" // IWYU pragma: keep
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+#include "exprs/aggregate/helpers.h"
+namespace doris {
+template <template <PrimitiveType> class Data>
+AggregateFunctionPtr create_aggregate_function_datasketches_hll_union_agg(
+        const std::string& name, const DataTypes& argument_types, const 
DataTypePtr& result_type,
+        const bool result_is_nullable, const AggregateFunctionAttr& attr) {
+    return creator_with_type_list<TYPE_STRING, TYPE_VARCHAR, 
TYPE_VARBINARY>::create<
+            AggregateFunctionDataSketchesHllUnionAgg, Data>(argument_types, 
result_is_nullable,
+                                                            attr);
+}
+void register_aggregate_function_datasketches_HLL_union_agg(
+        AggregateFunctionSimpleFactory& factory) {
+    AggregateFunctionCreator creator =
+            
create_aggregate_function_datasketches_hll_union_agg<AggregateFunctionHllSketchData>;
+    factory.register_function_both("datasketches_hll_union_agg", creator);
+    factory.register_alias("datasketches_hll_union_agg", "ds_hll_estimate");
+    factory.register_alias("datasketches_hll_union_agg", 
"datasketches_hll_estimate");
+}
+} // namespace doris
diff --git 
a/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h 
b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h
new file mode 100644
index 00000000000..d9f82f193e8
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <stddef.h>
+
+#include <DataSketches/hll.hpp>
+#include <algorithm>
+#include <boost/iterator/iterator_facade.hpp>
+#include <memory>
+#include <optional>
+#include <type_traits>
+#include <vector>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "core/assert_cast.h"
+#include "core/column/column.h"
+#include "core/column/column_varbinary.h"
+#include "core/column/column_vector.h"
+#include "core/custom_allocator.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/define_primitive_type.h"
+#include "core/field.h"
+#include "core/string_ref.h"
+#include "core/types.h"
+#include "core/uint128.h"
+#include "exec/common/hash_table/hash.h"
+#include "exec/common/hash_table/phmap_fwd_decl.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "util/var_int.h"
+template <typename T>
+struct HashCRC32;
+namespace doris {
+class Arena;
+class BufferReadable;
+class BufferWritable;
+template <PrimitiveType T>
+class ColumnDecimal;
+/// datasketches_hll_union_agg
+template <PrimitiveType T>
+struct AggregateFunctionHllSketchData {
+    /** We set the default LgK to 12,
+      * as this value is used as a performance baseline in the relevant 
documentation.
+      * (https://datasketches.apache.org/docs/HLL/HllPerformance.html)
+      */
+    static constexpr uint8_t DEFAULT_LOG_K = 12;
+    using Alloc = CustomStdAllocator<uint8_t>;
+    using Sketch = datasketches::hll_sketch_alloc<Alloc>;
+    using Union = datasketches::hll_union_alloc<Alloc>;
+
+    std::optional<Union> hll_union_data;
+
+    static String get_name() { return "datasketches_hll_union_agg"; }
+
+    void merge(const Sketch& sketch_data) {
+        if (!hll_union_data.has_value()) {
+            /** We clamp max lg_k to [7, 21],
+              * considering that the code comment requires 7 to 21.
+              * See: datasketches-cpp/hll/include/hll.hpp:451
+              */
+            constexpr uint8_t MIN_UNION_LOG_K = 7;
+            const uint8_t union_lg_k =
+                    std::clamp<uint8_t>(sketch_data.get_lg_config_k(), 
MIN_UNION_LOG_K,
+                                        
datasketches::hll_constants::MAX_LOG_K);
+            hll_union_data.emplace(union_lg_k, Alloc());
+        }
+        try {
+            hll_union_data->update(sketch_data);
+        } catch (const doris::Exception& e) {
+            throw Exception(e.code(), "Internal error happened when update HLL 
sketch: {}",
+                            e.to_string());
+        } catch (const std::exception& e) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "Internal error happened when update HLL sketch: 
{}", e.what());
+        } catch (...) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "Internal error happened when update HLL sketch: 
unknown exception.");
+        }
+    }
+    void reset() {
+        if (hll_union_data.has_value()) {
+            hll_union_data->reset();
+        }
+        hll_union_data.reset();
+    }
+
+    void write_sketch(BufferWritable& buf, const Sketch& sk) const {
+        auto serialized_bytes = sk.serialize_compact();
+        StringRef d(serialized_bytes.data(), serialized_bytes.size());
+        buf.write_binary(d);
+    }
+    void write(BufferWritable& buf) const {
+        if (!hll_union_data.has_value()) {
+            /** Using DEFAULT_LOG_K(12) here is surely sufficient,
+              * because in this case the union that actually needs to be 
serialized should contain no data.
+              */
+            Union u(DEFAULT_LOG_K, Alloc());
+            write_sketch(buf, u.get_result());
+            return;
+        }
+        try {
+            auto cache = hll_union_data->get_result();
+            write_sketch(buf, cache);
+        } catch (const doris::Exception& e) {
+            throw Exception(e.code(), "Internal error happened when serialize 
HLL sketch: {}",
+                            e.to_string());
+        } catch (const std::exception& e) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "Internal error happened when serialize HLL 
sketch: {}", e.what());
+        } catch (...) {
+            throw Exception(
+                    ErrorCode::INTERNAL_ERROR,
+                    "Internal error happened when serialize HLL sketch: 
unknown exception.");
+        }
+    }
+    void read(BufferReadable& buf) {
+        StringRef d;
+        buf.read_binary(d);
+
+        auto cache = [&]() -> Sketch {
+            try {
+                return Sketch::deserialize(d.data, d.size, Alloc());
+            } catch (const doris::Exception& e) {
+                throw Exception(e.code(), "Failed to deserialize HLL sketch 
when read: {}",
+                                e.to_string());
+            } catch (const std::exception& e) {
+                throw Exception(ErrorCode::CORRUPTION, "HLL sketch data 
corrupted when read: {}",
+                                e.what());
+            } catch (...) {
+                throw Exception(ErrorCode::CORRUPTION,
+                                "HLL sketch data corrupted when read: unknown 
exception.");
+            }
+        }();
+
+        merge(cache);
+    }
+    double get_result() const {
+        if (hll_union_data.has_value()) {
+            try {
+                return hll_union_data->get_estimate();
+            } catch (const doris::Exception& e) {
+                throw Exception(e.code(),
+                                "Internal error happened when get HLL sketch 
estimate: {}",
+                                e.to_string());
+            } catch (const std::exception& e) {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "Internal error happened when get HLL sketch 
estimate: {}",
+                                e.what());
+            } catch (...) {
+                throw Exception(
+                        ErrorCode::INTERNAL_ERROR,
+                        "Internal error happened when get HLL sketch estimate: 
unknown exception.");
+            }
+        }
+        return 0.0;
+    }
+};
+
+/// Calculates the number of different values approximately using hll sketch.
+template <PrimitiveType T, typename Data>
+class AggregateFunctionDataSketchesHllUnionAgg final
+        : public IAggregateFunctionDataHelper<Data,
+                                              
AggregateFunctionDataSketchesHllUnionAgg<T, Data>>,
+          UnaryExpression,
+          NotNullableAggregateFunction {
+public:
+    AggregateFunctionDataSketchesHllUnionAgg(const DataTypes& argument_types_)
+            : IAggregateFunctionDataHelper<Data, 
AggregateFunctionDataSketchesHllUnionAgg<T, Data>>(
+                      argument_types_) {}
+    String get_name() const override { return Data::get_name(); }
+    DataTypePtr get_return_type() const override { return 
std::make_shared<DataTypeFloat64>(); }
+    void reset(AggregateDataPtr __restrict place) const override { 
this->data(place).reset(); }
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
+             Arena&) const override {
+        add_one(this->data(place), *columns[0], row_num);
+    }
+    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+               Arena&) const override {
+        const auto& rhs_data = this->data(rhs);
+        if (!rhs_data.hll_union_data.has_value()) {
+            return;
+        }
+        
this->data(place).merge(rhs_data.hll_union_data->get_result(datasketches::HLL_8));
+    }
+    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
+        this->data(place).write(buf);
+    }
+    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+                     Arena&) const override {
+        this->data(place).read(buf);
+    }
+    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
+        
assert_cast<ColumnFloat64&>(to).get_data().push_back(this->data(place).get_result());
+    }
+
+private:
+    static void ALWAYS_INLINE add_one(Data& data, const IColumn& column, 
ssize_t row_num) {
+        if constexpr (is_string_type(T) || is_varbinary(T)) {
+            const auto& src_column = assert_cast<const typename 
PrimitiveTypeTraits<T>::ColumnType&,
+                                                 
TypeCheckOnRelease::DISABLE>(column);
+            StringRef value = 
src_column.get_data_at(static_cast<size_t>(row_num));
+            if (value.empty()) {
+                throw Exception(ErrorCode::CORRUPTION,
+                                "HLL sketch data corrupted when add: empty 
input.");
+            }
+
+            using Sketch = typename Data::Sketch;
+            using Alloc = typename Data::Alloc;
+
+            auto sketch_data = [&]() -> Sketch {
+                try {
+                    return Sketch::deserialize(value.begin(), value.size, 
Alloc());
+                } catch (const doris::Exception& e) {
+                    throw Exception(e.code(), "Failed to deserialize HLL 
sketch when add: {}",
+                                    e.to_string());
+                } catch (const std::exception& e) {
+                    throw Exception(ErrorCode::CORRUPTION, "HLL sketch data 
corrupted when add: {}",
+                                    e.what());
+                } catch (...) {
+                    throw Exception(ErrorCode::CORRUPTION,
+                                    "HLL sketch data corrupted when add: 
unknown exception.");
+                }
+            }();
+
+            data.merge(sketch_data);
+        }
+    }
+};
+} // namespace doris
diff --git a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp 
b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
index 9fc311cdf08..6e18e95bee3 100644
--- a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
@@ -38,6 +38,8 @@ void 
register_aggregate_function_avg(AggregateFunctionSimpleFactory& factory);
 void register_aggregate_function_count(AggregateFunctionSimpleFactory& 
factory);
 void register_aggregate_function_count_by_enum(AggregateFunctionSimpleFactory& 
factory);
 void register_aggregate_function_HLL_union_agg(AggregateFunctionSimpleFactory& 
factory);
+void register_aggregate_function_datasketches_HLL_union_agg(
+        AggregateFunctionSimpleFactory& factory);
 void register_aggregate_function_uniq(AggregateFunctionSimpleFactory& factory);
 void 
register_aggregate_function_uniq_distribute_key(AggregateFunctionSimpleFactory& 
factory);
 void register_aggregate_function_bit(AggregateFunctionSimpleFactory& factory);
@@ -128,6 +130,7 @@ AggregateFunctionSimpleFactory& 
AggregateFunctionSimpleFactory::instance() {
         register_aggregate_function_replace_reader_load(instance);
         register_aggregate_function_window_lead_lag_first_last(instance);
         register_aggregate_function_HLL_union_agg(instance);
+        register_aggregate_function_datasketches_HLL_union_agg(instance);
         register_aggregate_functions_corr(instance);
         register_aggregate_functions_corr_welford(instance);
         register_aggregate_function_covar_pop(instance);
diff --git a/be/test/exprs/aggregate/agg_datasketches_hll_union_agg_test.cpp 
b/be/test/exprs/aggregate/agg_datasketches_hll_union_agg_test.cpp
new file mode 100644
index 00000000000..7783ec89e2c
--- /dev/null
+++ b/be/test/exprs/aggregate/agg_datasketches_hll_union_agg_test.cpp
@@ -0,0 +1,1097 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <DataSketches/hll.hpp>
+
+#include "agent/be_exec_version_manager.h"
+#include "common/config.h"
+#include "common/status.h"
+#include "core/column/column.h"
+#include "core/column/column_string.h"
+#include "core/column/column_varbinary.h"
+#include "core/custom_allocator.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_varbinary.h"
+#include "exec/common/hash_table/hash.h"
+#include "exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h"
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+
+namespace doris {
+
+void register_aggregate_function_datasketches_HLL_union_agg(
+        AggregateFunctionSimpleFactory& factory);
+
+class AggregateFunctionDataSketchesHllUnionAggTest : public ::testing::Test {
+protected:
+    void SetUp() override { arena = std::make_unique<Arena>(); }
+
+    void TearDown() override { arena.reset(); }
+
+    std::unique_ptr<Arena> arena;
+};
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testBasicUnion) {
+    // Create test: union multiple hll sketches and get correct cardinality 
estimate
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    // Create 2 different hll sketches, each with 100 unique values
+    datasketches::hll_sketch sketch1(8); // lg_k=8
+    for (int i = 0; i < 100; i++) {
+        sketch1.update(i);
+    }
+
+    datasketches::hll_sketch sketch2(8);
+    for (int i = 50; i < 150; i++) {
+        sketch2.update(i);
+    }
+
+    // Serialize both sketches into string column
+    auto column_string = ColumnString::create();
+    const auto ser1 = sketch1.serialize_compact();
+    column_string->insert_data((const char*)(ser1.data()), ser1.size());
+    const auto ser2 = sketch2.serialize_compact();
+    column_string->insert_data((const char*)(ser2.data()), ser2.size());
+
+    // Create aggregate data place
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    // Add both rows
+    const IColumn* columns[1] = {column_string.get()};
+    agg_func->add(place, columns, 0, *arena);
+    agg_func->add(place, columns, 1, *arena);
+
+    // Get result
+    ColumnFloat64 result_column;
+    agg_func->insert_result_into(place, result_column);
+
+    double estimate = result_column.get_data()[0];
+    EXPECT_GE(estimate, 130.0);
+    EXPECT_LE(estimate, 170.0);
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testMergeTwoAggStates) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    // Create two separate aggregate states
+    AggregateDataPtr place1 =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place1);
+
+    AggregateDataPtr place2 =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place2);
+
+    // Add different data to each state
+    datasketches::hll_sketch sketch1(8);
+    for (int i = 0; i < 100; i++) sketch1.update(i);
+    const auto ser1 = sketch1.serialize_compact();
+
+    datasketches::hll_sketch sketch2(8);
+    for (int i = 100; i < 200; i++) sketch2.update(i);
+    const auto ser2 = sketch2.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)(ser1.data()), ser1.size());
+    column_string->insert_data((const char*)(ser2.data()), ser2.size());
+
+    const IColumn* columns[1] = {column_string.get()};
+    agg_func->add(place1, columns, 0, *arena);
+    agg_func->add(place2, columns, 1, *arena);
+
+    // Merge second state into first
+    agg_func->merge(place1, place2, *arena);
+
+    // Get result
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place1, result);
+    double estimate = result.get_data()[0];
+
+    EXPECT_GE(estimate, 170.0);
+    EXPECT_LE(estimate, 230.0); // 200 unique values expected
+
+    agg_func->destroy(place1);
+    agg_func->destroy(place2);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testMergeEmptyStateDoesNotCrash) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    AggregateDataPtr place_with_data =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place_with_data);
+
+    AggregateDataPtr empty_rhs_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(empty_rhs_place);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 0; i < 100; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+    agg_func->add(place_with_data, columns, 0, *arena);
+
+    // Covers the "all NULL" style path: rhs exists but never saw add().
+    EXPECT_NO_THROW(agg_func->merge(place_with_data, empty_rhs_place, *arena));
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place_with_data, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], sketch.get_estimate());
+
+    // Empty string is invalid serialized sketch and should be rejected by 
add().
+    // Merge-empty-state coverage is handled by the "never saw add()" path 
above.
+
+    AggregateDataPtr empty_lhs_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(empty_lhs_place);
+    EXPECT_NO_THROW(agg_func->merge(empty_lhs_place, empty_rhs_place, *arena));
+
+    ColumnFloat64 empty_merge_result;
+    agg_func->insert_result_into(empty_lhs_place, empty_merge_result);
+    EXPECT_DOUBLE_EQ(empty_merge_result.get_data()[0], 0.0);
+
+    agg_func->destroy(place_with_data);
+    agg_func->destroy(empty_rhs_place);
+    agg_func->destroy(empty_lhs_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testEmptyState) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testSerializeDeserialize) 
{
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    // Add some data
+    datasketches::hll_sketch sketch(8);
+    for (int i = 0; i < 100; i++) sketch.update(i);
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)(ser.data()), ser.size());
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_string.get()};
+    agg_func->add(place, columns, 0, *arena);
+
+    // Serialize
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+    agg_func->serialize(place, w);
+    w.commit();
+
+    // Deserialize into new state
+    AggregateDataPtr new_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(new_place);
+
+    BufferReadable r(buffer->get_data_at(0));
+    agg_func->deserialize(new_place, r, *arena);
+
+    // Compare results
+    ColumnFloat64 result1, result2;
+    agg_func->insert_result_into(place, result1);
+    agg_func->insert_result_into(new_place, result2);
+
+    EXPECT_DOUBLE_EQ(result1.get_data()[0], result2.get_data()[0]);
+
+    agg_func->destroy(place);
+    agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testReset) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    datasketches::hll_sketch sketch(8);
+    for (int i = 0; i < 100; i++) sketch.update(i);
+    auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)(ser.data()), ser.size());
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_string.get()};
+    agg_func->add(place, columns, 0, *arena);
+
+    // Reset
+    agg_func->reset(place);
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testResetThenAddReinitializesState) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    datasketches::hll_sketch sketch1(8, datasketches::HLL_8);
+    for (int i = 0; i < 7; ++i) {
+        sketch1.update(i);
+    }
+    const auto ser1 = sketch1.serialize_compact();
+
+    datasketches::hll_sketch sketch2(8, datasketches::HLL_8);
+    for (int i = 10; i < 17; ++i) {
+        sketch2.update(i);
+    }
+    const auto ser2 = sketch2.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser1.data(), ser1.size());
+    column_string->insert_data((const char*)ser2.data(), ser2.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    agg_func->add(place, columns, 0, *arena);
+    agg_func->reset(place);
+    agg_func->add(place, columns, 1, *arena);
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], sketch2.get_estimate());
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+       testVarcharResetThenAddAndMergeEmptyRhsDoesNotCrash) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {std::make_shared<DataTypeString>(-1, 
TYPE_VARCHAR)};
+
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_VARCHAR,
+                                                     
AggregateFunctionHllSketchData<TYPE_VARCHAR>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    datasketches::hll_sketch sketch1(8, datasketches::HLL_8);
+    for (int i = 0; i < 7; ++i) {
+        sketch1.update(i);
+    }
+    const auto ser1 = sketch1.serialize_compact();
+
+    datasketches::hll_sketch sketch2(8, datasketches::HLL_8);
+    for (int i = 10; i < 17; ++i) {
+        sketch2.update(i);
+    }
+    const auto ser2 = sketch2.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser1.data(), ser1.size());
+    column_string->insert_data((const char*)ser2.data(), ser2.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    agg_func->add(place, columns, 0, *arena);
+    agg_func->reset(place);
+    agg_func->add(place, columns, 1, *arena);
+
+    ColumnFloat64 after_reset;
+    agg_func->insert_result_into(place, after_reset);
+    EXPECT_DOUBLE_EQ(after_reset.get_data()[0], sketch2.get_estimate());
+
+    AggregateDataPtr empty_rhs_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(empty_rhs_place);
+
+    EXPECT_NO_THROW(agg_func->merge(place, empty_rhs_place, *arena));
+
+    ColumnFloat64 after_merge;
+    agg_func->insert_result_into(place, after_merge);
+    EXPECT_DOUBLE_EQ(after_merge.get_data()[0], sketch2.get_estimate());
+
+    AggregateDataPtr empty_lhs_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(empty_lhs_place);
+
+    EXPECT_NO_THROW(agg_func->merge(empty_lhs_place, empty_rhs_place, *arena));
+
+    ColumnFloat64 empty_merge_result;
+    agg_func->insert_result_into(empty_lhs_place, empty_merge_result);
+    EXPECT_DOUBLE_EQ(empty_merge_result.get_data()[0], 0.0);
+
+    agg_func->destroy(place);
+    agg_func->destroy(empty_rhs_place);
+    agg_func->destroy(empty_lhs_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testFactoryCreateAndAliases) {
+    AggregateFunctionSimpleFactory factory;
+    register_aggregate_function_datasketches_HLL_union_agg(factory);
+    int be_version = BeExecVersionManager::get_newest_version();
+
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+
+    auto fn_main =
+            factory.get("datasketches_hll_union_agg", argument_types, nullptr, 
false, be_version);
+    auto fn_alias_sr_estimate =
+            factory.get("ds_hll_estimate", argument_types, nullptr, false, 
be_version);
+    auto fn_alias_datasketches_estimate =
+            factory.get("datasketches_hll_estimate", argument_types, nullptr, 
false, be_version);
+
+    ASSERT_NE(fn_main, nullptr);
+    ASSERT_NE(fn_alias_sr_estimate, nullptr);
+    ASSERT_NE(fn_alias_datasketches_estimate, nullptr);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 0; i < 7; ++i) sketch.update(i);
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    auto run_and_get_result = [&](const AggregateFunctionPtr& fn) {
+        AggregateDataPtr place = arena->aligned_alloc(fn->size_of_data(), 
fn->align_of_data());
+        fn->create(place);
+        fn->add(place, columns, 0, *arena);
+        ColumnFloat64 result;
+        fn->insert_result_into(place, result);
+        fn->destroy(place);
+        return result.get_data()[0];
+    };
+
+    double expected = sketch.get_estimate();
+    EXPECT_DOUBLE_EQ(run_and_get_result(fn_main), expected);
+    EXPECT_DOUBLE_EQ(run_and_get_result(fn_alias_sr_estimate), expected);
+    EXPECT_DOUBLE_EQ(run_and_get_result(fn_alias_datasketches_estimate), 
expected);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+       testFactoryCreateForVarcharAndNullableAndUnsupportedType) {
+    AggregateFunctionSimpleFactory factory;
+    register_aggregate_function_datasketches_HLL_union_agg(factory);
+    int be_version = BeExecVersionManager::get_newest_version();
+
+    DataTypes varchar_types = {std::make_shared<DataTypeString>(-1, 
TYPE_VARCHAR)};
+    auto fn_varchar =
+            factory.get("datasketches_hll_union_agg", varchar_types, nullptr, 
false, be_version);
+    auto fn_varchar_alias =
+            factory.get("ds_hll_estimate", varchar_types, nullptr, false, 
be_version);
+    ASSERT_NE(fn_varchar, nullptr);
+    ASSERT_NE(fn_varchar_alias, nullptr);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 0; i < 7; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    auto run_and_get_result = [&](const AggregateFunctionPtr& fn) {
+        AggregateDataPtr place = arena->aligned_alloc(fn->size_of_data(), 
fn->align_of_data());
+        fn->create(place);
+        fn->add(place, columns, 0, *arena);
+        ColumnFloat64 result;
+        fn->insert_result_into(place, result);
+        fn->destroy(place);
+        return result.get_data()[0];
+    };
+
+    double expected = sketch.get_estimate();
+    EXPECT_DOUBLE_EQ(run_and_get_result(fn_varchar), expected);
+    EXPECT_DOUBLE_EQ(run_and_get_result(fn_varchar_alias), expected);
+
+    DataTypes nullable_varchar_types = {
+            make_nullable(std::make_shared<DataTypeString>(-1, TYPE_VARCHAR))};
+    auto fn_nullable_varchar = factory.get("datasketches_hll_union_agg", 
nullable_varchar_types,
+                                           nullptr, false, be_version);
+    ASSERT_NE(fn_nullable_varchar, nullptr);
+
+    DataTypes unsupported_types = {std::make_shared<DataTypeInt32>()};
+    auto fn_unsupported = factory.get("datasketches_hll_union_agg", 
unsupported_types, nullptr,
+                                      false, be_version);
+    EXPECT_EQ(fn_unsupported, nullptr);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testLowLgKSketchDoesNotReportCorruption) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    datasketches::hll_sketch sketch(4, datasketches::HLL_8);
+    for (int i = 0; i < 100; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_string.get()};
+    EXPECT_NO_THROW(agg_func->add(place, columns, 0, *arena));
+
+    ColumnFloat64 add_result;
+    agg_func->insert_result_into(place, add_result);
+
+    EXPECT_GE(add_result.get_data()[0], 50);
+    EXPECT_LE(add_result.get_data()[0], 150);
+
+    auto buf = ColumnString::create();
+    BufferWritable w(*buf);
+    StringRef d((const char*)ser.data(), ser.size());
+    w.write_binary(d);
+    w.commit();
+
+    AggregateDataPtr place2 =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place2);
+
+    BufferReadable r(buf->get_data_at(0));
+    EXPECT_NO_THROW(agg_func->deserialize(place2, r, *arena));
+
+    ColumnFloat64 deserialize_result;
+    agg_func->insert_result_into(place2, deserialize_result);
+    EXPECT_EQ(deserialize_result.get_data()[0], add_result.get_data()[0]);
+
+    agg_func->destroy(place);
+    agg_func->destroy(place2);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testAddEmptyStringThrows) 
{
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data("", 0);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_string.get()};
+
+    try {
+        agg_func->add(place, columns, 0, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testResetOnEmptyState) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    EXPECT_NO_THROW(agg_func->reset(place)); // cover reset() branch when 
union_data is nullptr
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testVarbinaryInput) {
+    AggregateFunctionSimpleFactory factory;
+    register_aggregate_function_datasketches_HLL_union_agg(factory);
+    int be_version = BeExecVersionManager::get_newest_version();
+
+    DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+    auto fn = factory.get("datasketches_hll_union_agg", argument_types, 
nullptr, false, be_version);
+    ASSERT_NE(fn, nullptr);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 20; i < 30; ++i) sketch.update(i);
+    const auto ser = sketch.serialize_compact();
+
+    auto column_varbinary = ColumnVarbinary::create();
+    column_varbinary->insert_data((const char*)ser.data(), ser.size());
+
+    const IColumn* columns[1] = {column_varbinary.get()};
+
+    AggregateDataPtr place = arena->aligned_alloc(fn->size_of_data(), 
fn->align_of_data());
+    fn->create(place);
+    fn->add(place, columns, 0, *arena);
+
+    ColumnFloat64 result;
+    fn->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], sketch.get_estimate());
+
+    fn->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarbinaryAddEmptyStringThrows) {
+    DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARBINARY, 
AggregateFunctionHllSketchData<TYPE_VARBINARY>>>(argument_types);
+
+    auto column_varbinary = ColumnVarbinary::create();
+    column_varbinary->insert_data("", 0);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_varbinary.get()};
+
+    try {
+        agg_func->add(place, columns, 0, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarbinaryCorruptedInputThrows) {
+    DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARBINARY, 
AggregateFunctionHllSketchData<TYPE_VARBINARY>>>(argument_types);
+
+    auto column_varbinary = ColumnVarbinary::create();
+    column_varbinary->insert_data("x", 1);
+    const IColumn* columns[1] = {column_varbinary.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    try {
+        agg_func->add(place, columns, 0, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    auto corrupt_buf = ColumnString::create();
+    BufferWritable corrupt_w(*corrupt_buf);
+    StringRef corrupted("x", 1);
+    corrupt_w.write_binary(corrupted);
+    corrupt_w.commit();
+
+    BufferReadable r(corrupt_buf->get_data_at(0));
+    try {
+        agg_func->deserialize(place, r, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarbinarySerializeDeserialize) {
+    DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARBINARY, 
AggregateFunctionHllSketchData<TYPE_VARBINARY>>>(argument_types);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 0; i < 100; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_varbinary = ColumnVarbinary::create();
+    column_varbinary->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_varbinary.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+    agg_func->add(place, columns, 0, *arena);
+
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+    agg_func->serialize(place, w);
+    w.commit();
+
+    AggregateDataPtr new_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(new_place);
+
+    BufferReadable r(buffer->get_data_at(0));
+    agg_func->deserialize(new_place, r, *arena);
+
+    ColumnFloat64 result1, result2;
+    agg_func->insert_result_into(place, result1);
+    agg_func->insert_result_into(new_place, result2);
+
+    EXPECT_DOUBLE_EQ(result1.get_data()[0], result2.get_data()[0]);
+
+    agg_func->destroy(place);
+    agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testSerializeDeserializeEmptyState) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+    EXPECT_NO_THROW(agg_func->serialize(place, w)); // covers write() 
empty-state branch
+    w.commit();
+
+    AggregateDataPtr new_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(new_place);
+
+    BufferReadable r(buffer->get_data_at(0));
+    agg_func->deserialize(new_place, r, *arena);
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(new_place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+    agg_func->destroy(place);
+    agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testCorruptedInputThrows) 
{
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data("x", 1); // definitely not a valid sketch
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    try {
+        agg_func->add(place, columns, 0, *arena); // covers add() CORRUPTION 
catch branch
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    auto corrupt_buf = ColumnString::create();
+    BufferWritable corrupt_w(*corrupt_buf);
+    StringRef corrupted("x", 1);
+    corrupt_w.write_binary(corrupted);
+    corrupt_w.commit();
+
+    BufferReadable r(corrupt_buf->get_data_at(0));
+    try {
+        agg_func->deserialize(place, r, *arena); // covers read() CORRUPTION 
catch branch
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testAllocatorAwareSketchInput) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    using Alloc = doris::CustomStdAllocator<uint8_t>;
+    using Sketch = datasketches::hll_sketch_alloc<Alloc>;
+
+    Sketch sketch(8, datasketches::HLL_8, false, Alloc());
+    for (int i = 0; i < 7; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    agg_func->add(place, columns, 0, *arena);
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], sketch.get_estimate());
+
+    agg_func->destroy(place);
+}
+
+namespace {
+
+class ScopedMemAllocFaultInjection {
+public:
+    explicit ScopedMemAllocFaultInjection(double probability)
+            : _old_probability(doris::config::mem_alloc_fault_probability) {
+        doris::config::mem_alloc_fault_probability = probability;
+        doris::enable_thread_catch_bad_alloc++;
+    }
+
+    ~ScopedMemAllocFaultInjection() {
+        doris::enable_thread_catch_bad_alloc--;
+        doris::config::mem_alloc_fault_probability = _old_probability;
+    }
+
+    ScopedMemAllocFaultInjection(const ScopedMemAllocFaultInjection&) = delete;
+    ScopedMemAllocFaultInjection& operator=(const 
ScopedMemAllocFaultInjection&) = delete;
+
+private:
+    double _old_probability;
+};
+
+} // namespace
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testMetaInfoCoversInlineMethods) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    EXPECT_EQ(AggregateFunctionHllSketchData<TYPE_STRING>::get_name(),
+              "datasketches_hll_union_agg");
+    EXPECT_EQ(agg_func->get_name(), "datasketches_hll_union_agg");
+
+    auto return_type = agg_func->get_return_type();
+    EXPECT_TRUE(return_type->equals(*std::make_shared<DataTypeFloat64>()));
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testAddMemAllocFailedThrowsMemAllocFailed) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    datasketches::hll_sketch sketch(12, datasketches::HLL_8);
+    for (int i = 0; i < 1000; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    {
+        ScopedMemAllocFaultInjection inject(1.0);
+        try {
+            agg_func->add(place, columns, 0, *arena);
+            FAIL() << "Expected doris::Exception";
+        } catch (const doris::Exception& e) {
+            EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+        }
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+       testDeserializeMemAllocFailedThrowsMemAllocFailed) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    datasketches::hll_sketch sketch(12, datasketches::HLL_8);
+    for (int i = 0; i < 1000; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+    StringRef d((const char*)ser.data(), ser.size());
+    w.write_binary(d);
+    w.commit();
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    BufferReadable r(buffer->get_data_at(0));
+    {
+        ScopedMemAllocFaultInjection inject(1.0);
+        try {
+            agg_func->deserialize(place, r, *arena);
+            FAIL() << "Expected doris::Exception";
+        } catch (const doris::Exception& e) {
+            EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+        }
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarcharAddEmptyStringThrows) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>(-1, 
TYPE_VARCHAR)};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARCHAR, 
AggregateFunctionHllSketchData<TYPE_VARCHAR>>>(argument_types);
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data("", 0);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_string.get()};
+
+    try {
+        agg_func->add(place, columns, 0, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarcharCorruptedInputThrows) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>(-1, 
TYPE_VARCHAR)};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARCHAR, 
AggregateFunctionHllSketchData<TYPE_VARCHAR>>>(argument_types);
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data("x", 1);
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    try {
+        agg_func->add(place, columns, 0, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    auto corrupt_buf = ColumnString::create();
+    BufferWritable corrupt_w(*corrupt_buf);
+    StringRef corrupted("x", 1);
+    corrupt_w.write_binary(corrupted);
+    corrupt_w.commit();
+
+    BufferReadable r(corrupt_buf->get_data_at(0));
+    try {
+        agg_func->deserialize(place, r, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarcharSerializeDeserialize) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>(-1, 
TYPE_VARCHAR)};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARCHAR, 
AggregateFunctionHllSketchData<TYPE_VARCHAR>>>(argument_types);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 0; i < 100; i++) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+    agg_func->add(place, columns, 0, *arena);
+
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+    agg_func->serialize(place, w);
+    w.commit();
+
+    AggregateDataPtr new_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(new_place);
+
+    BufferReadable r(buffer->get_data_at(0));
+    agg_func->deserialize(new_place, r, *arena);
+
+    ColumnFloat64 result1, result2;
+    agg_func->insert_result_into(place, result1);
+    agg_func->insert_result_into(new_place, result2);
+
+    EXPECT_DOUBLE_EQ(result1.get_data()[0], result2.get_data()[0]);
+
+    agg_func->destroy(place);
+    agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+       testSerializeMemAllocFailedThrowsMemAllocFailed) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    datasketches::hll_sketch sketch(12, datasketches::HLL_8);
+    for (int i = 0; i < 2000; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+    agg_func->add(place, columns, 0, *arena);
+
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+
+    {
+        ScopedMemAllocFaultInjection inject(1.0);
+        try {
+            agg_func->serialize(place, w);
+            FAIL() << "Expected doris::Exception";
+        } catch (const doris::Exception& e) {
+            EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+            EXPECT_NE(e.to_string().find("serialize HLL sketch"), 
std::string::npos);
+        }
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testDataMergeDownsampleMemAllocFailed) {
+    using Data = AggregateFunctionHllSketchData<TYPE_STRING>;
+    using Alloc = Data::Alloc;
+    using Sketch = Data::Sketch;
+
+    Data data;
+
+    Sketch large_k_sketch(12, datasketches::HLL_8, true, Alloc());
+    for (int i = 0; i < 5000; ++i) {
+        large_k_sketch.update(i);
+    }
+    EXPECT_NO_THROW(data.merge(large_k_sketch));
+
+    Sketch small_k_sketch(8, datasketches::HLL_8, true, Alloc());
+    for (int i = 0; i < 5000; ++i) {
+        small_k_sketch.update(i);
+    }
+
+    {
+        ScopedMemAllocFaultInjection inject(1.0);
+        try {
+            data.merge(small_k_sketch);
+            FAIL() << "Expected doris::Exception";
+        } catch (const doris::Exception& e) {
+            EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+            EXPECT_NE(e.to_string().find("update HLL sketch"), 
std::string::npos);
+        }
+    }
+}
+
+} // namespace doris
diff --git a/build.sh b/build.sh
index 66c9836cf0a..c0cba232b2d 100755
--- a/build.sh
+++ b/build.sh
@@ -34,6 +34,7 @@ if [[ -z "${DORIS_THIRDPARTY}" ]]; then
     export DORIS_THIRDPARTY="${DORIS_HOME}/thirdparty"
 fi
 export TP_INCLUDE_DIR="${DORIS_THIRDPARTY}/installed/include"
+export TP_INSTALLED_DIR="${DORIS_THIRDPARTY}/installed"
 export TP_LIB_DIR="${DORIS_THIRDPARTY}/installed/lib"
 HADOOP_DEPS_NAME="hadoop-deps"
 . "${DORIS_HOME}/env.sh"
@@ -628,6 +629,14 @@ FE_MODULES="$(
 
 # Clean and build Backend
 if [[ "${BUILD_BE}" -eq 1 ]]; then
+
+    echo "install datasketches-cpp to thirdparty path before build be"
+    update_submodule "contrib/datasketches-cpp" "datasketches-cpp" 
"https://github.com/apache/datasketches-cpp/archive/refs/heads/master.tar.gz";
+    cd "${DORIS_HOME}/contrib/datasketches-cpp"
+    "${CMAKE_CMD}" -S . -B build/Release -DCMAKE_BUILD_TYPE=Release 
-DCMAKE_INSTALL_PREFIX=$TP_INSTALLED_DIR -DBUILD_TESTS=OFF
+    "${CMAKE_CMD}" --build build/Release -t install
+    cd "${DORIS_HOME}"
+
     update_submodule "contrib/apache-orc" "apache-orc" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz";
     update_submodule "contrib/clucene" "clucene" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene.tar.gz";
     update_submodule "contrib/openblas" "openblas" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/openblas.tar.gz";
diff --git a/contrib/datasketches-cpp b/contrib/datasketches-cpp
new file mode 160000
index 00000000000..de8553ba372
--- /dev/null
+++ b/contrib/datasketches-cpp
@@ -0,0 +1 @@
+Subproject commit de8553ba372e618382c2e7b44b0ffc9422b9458c
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
index 66140acc790..8d08bba9ce7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
@@ -38,6 +38,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum;
 import org.apache.doris.nereids.trees.expressions.functions.agg.Covar;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.DataSketchesHllUnionAgg;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion;
@@ -148,6 +149,8 @@ public class BuiltinAggregateFunctions implements 
FunctionHelper {
                 agg(Histogram.class, "hist", "histogram"),
                 agg(HllUnion.class, "hll_raw_agg", "hll_union"),
                 agg(HllUnionAgg.class, "hll_union_agg"),
+                agg(DataSketchesHllUnionAgg.class, 
"datasketches_hll_union_agg",
+                    "ds_hll_estimate", "datasketches_hll_estimate"),
                 agg(IntersectCount.class, "intersect_count"),
                 agg(Kurt.class, "kurt", "kurt_pop", "kurtosis"),
                 agg(LinearHistogram.class, "linear_histogram"),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/DataSketchesHllUnionAgg.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/DataSketchesHllUnionAgg.java
new file mode 100644
index 00000000000..9fe46d2a77d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/DataSketchesHllUnionAgg.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.agg;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.functions.Function;
+import org.apache.doris.nereids.trees.expressions.functions.FunctionTrait;
+import org.apache.doris.nereids.trees.expressions.literal.DoubleLiteral;
+import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.DoubleType;
+import org.apache.doris.nereids.types.StringType;
+import org.apache.doris.nereids.types.VarBinaryType;
+import org.apache.doris.nereids.types.VarcharType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/** datasketches_hll_union_agg agg function. */
+public class DataSketchesHllUnionAgg extends NotNullableAggregateFunction
+        implements UnaryExpression, ExplicitlyCastableSignature, 
FunctionTrait, RollUpTrait {
+    public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
+            
FunctionSignature.ret(DoubleType.INSTANCE).args(StringType.INSTANCE),
+            
FunctionSignature.ret(DoubleType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT),
+            
FunctionSignature.ret(DoubleType.INSTANCE).args(VarBinaryType.INSTANCE)
+    );
+
+    /**
+     * constructor with 1 argument.
+     */
+    public DataSketchesHllUnionAgg(Expression arg) {
+        super("datasketches_hll_union_agg", arg);
+    }
+
+    /**
+     * constructor with 1 argument.
+     */
+    public DataSketchesHllUnionAgg(boolean distinct, Expression arg) {
+        this(arg);
+    }
+
+    /** constructor for withChildren and reuse signature */
+    protected DataSketchesHllUnionAgg(AggregateFunctionParams functionParams) {
+        super(functionParams);
+    }
+
+    @Override
+    public void checkLegalityBeforeTypeCoercion() {
+        DataType inputType = getArgumentType(0);
+        if (!(inputType.isStringType() || inputType.isVarcharType() || 
inputType.isVarBinaryType()
+                || inputType.isNullType())) {
+            throw new AnalysisException(getName()
+                + " function's argument should be of STRING/VARCHAR/VARBINARY 
type, but was " + inputType);
+        }
+    }
+
+    @Override
+    protected List<DataType> intermediateTypes() {
+        return ImmutableList.of(StringType.INSTANCE);
+    }
+
+    @Override
+    public List<FunctionSignature> getSignatures() {
+        return SIGNATURES;
+    }
+
+    @Override
+    public DataSketchesHllUnionAgg withDistinctAndChildren(boolean distinct, 
List<Expression> children) {
+        Preconditions.checkArgument(children.size() == 1);
+        return new DataSketchesHllUnionAgg(getFunctionParams(distinct, 
children));
+    }
+
+    @Override
+    public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+        return visitor.visitDataSketchesHllUnionAgg(this, context);
+    }
+
+    @Override
+    public Function constructRollUp(Expression param, Expression... varParams) 
{
+        return new 
DataSketchesHllUnionAgg(getFunctionParams(ImmutableList.of(param)));
+    }
+
+    @Override
+    public boolean canRollUp() {
+        return false;
+    }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new DoubleLiteral(0);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
index 251012c16fe..5292a967c27 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
@@ -39,6 +39,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum;
 import org.apache.doris.nereids.trees.expressions.functions.agg.Covar;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.DataSketchesHllUnionAgg;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion;
@@ -238,6 +239,10 @@ public interface AggregateFunctionVisitor<R, C> {
         return visitAggregateFunction(histogram, context);
     }
 
+    default R visitDataSketchesHllUnionAgg(DataSketchesHllUnionAgg 
datasketchesHllUnionAgg, C context) {
+        return visitAggregateFunction(datasketchesHllUnionAgg, context);
+    }
+
     default R visitHllUnion(HllUnion hllUnion, C context) {
         return visitAggregateFunction(hllUnion, context);
     }
diff --git 
a/regression-test/data/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.out
 
b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.out
new file mode 100644
index 00000000000..ae1424305c7
--- /dev/null
+++ 
b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.out
@@ -0,0 +1,28 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !basic_union --
+17
+
+-- !aliases --
+17     17      17
+
+-- !group_by --
+1      7
+2      10
+
+-- !distinct --
+17     17
+
+-- !basic_union_varchar --
+17
+
+-- !aliases_varchar --
+17     17      17
+
+-- !basic_union_varbinary --
+17
+
+-- !aliases_varbinary --
+17     17      17
+
+-- !empty_input --
+0
diff --git 
a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.groovy
 
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.groovy
new file mode 100644
index 00000000000..8d86b8c82d6
--- /dev/null
+++ 
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.groovy
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_datasketches_hll_union_agg") {
+    def tableName = "test_datasketches_hll_union_agg_tbl"
+    def varcharTableName = "test_datasketches_hll_union_agg_varchar_tbl"
+    def emptyTableName = "test_datasketches_hll_union_agg_empty_tbl"
+    def badTableName = "test_datasketches_hll_union_agg_bad_tbl"
+
+    // sk = new HllSketch(8, HLL_8); for (int i = 0; i < 7; i++) sk.update(i);
+    def sk1Base64 = "AgEHCAMIBwjL18IEK/L7BoYv+Q11gWYHgbxdBntl5gj8LUIK"
+
+    // sk = new HllSketch(8, HLL_8); for (int i = 20; i < 30; i++) 
sk.update(i);
+    def sk2Base64 = 
"AwEHCAUIAAkKAAAAIjvrBcS1nwfGGWoEyHokBO8t9wc1qTEENkcJB7hWqQxZf9QNnuSbGA=="
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+        CREATE TABLE ${tableName} (
+            id INT,
+            sk STRING
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+
+    sql """
+        INSERT INTO ${tableName} VALUES
+            (1, from_base64('${sk1Base64}')),
+            (2, from_base64('${sk2Base64}')),
+            (3, NULL)
+    """
+
+    // 1) Basic union: {0..6} U {20..29} => 17 distinct values
+    qt_basic_union """SELECT CAST(ROUND(datasketches_hll_union_agg(sk)) AS 
BIGINT) FROM ${tableName}"""
+
+    // 2) Aliases should behave identically
+    qt_aliases """SELECT
+            CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+            CAST(ROUND(ds_hll_estimate(sk)) AS BIGINT),
+            CAST(ROUND(datasketches_hll_estimate(sk)) AS BIGINT)
+        FROM ${tableName}
+    """
+
+    // 3) Group-by
+    qt_group_by """SELECT id, CAST(ROUND(datasketches_hll_union_agg(sk)) AS 
BIGINT)
+        FROM ${tableName}
+        WHERE id IN (1, 2)
+        GROUP BY id
+        ORDER BY id
+    """
+
+    // 4) DISTINCT should not change result in this data set
+    sql "INSERT INTO ${tableName} VALUES (5, from_base64('${sk1Base64}'))"
+    qt_distinct """SELECT
+            CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+            CAST(ROUND(datasketches_hll_union_agg(DISTINCT sk)) AS BIGINT)
+        FROM ${tableName}
+    """
+
+    // 4.1) Input type coverage: VARCHAR
+    sql "DROP TABLE IF EXISTS ${varcharTableName}"
+    sql """
+        CREATE TABLE ${varcharTableName} (
+            id INT,
+            sk VARCHAR(256)
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+
+    sql """
+        INSERT INTO ${varcharTableName} VALUES
+            (1, from_base64('${sk1Base64}')),
+            (2, from_base64('${sk2Base64}')),
+            (3, NULL)
+    """
+
+    qt_basic_union_varchar """SELECT 
CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT) FROM 
${varcharTableName}"""
+
+    qt_aliases_varchar """SELECT
+            CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+            CAST(ROUND(ds_hll_estimate(sk)) AS BIGINT),
+            CAST(ROUND(datasketches_hll_estimate(sk)) AS BIGINT)
+        FROM ${varcharTableName}
+    """
+
+    // 4.2) Input type coverage: VARBINARY (no table column; VARBINARY is not 
supported for table storage)
+    qt_basic_union_varbinary """SELECT 
CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT) FROM (
+            SELECT from_base64_binary('${sk1Base64}') AS sk
+            UNION ALL SELECT from_base64_binary('${sk2Base64}')
+            UNION ALL SELECT NULL
+        ) t
+    """
+
+    qt_aliases_varbinary """SELECT
+            CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+            CAST(ROUND(ds_hll_estimate(sk)) AS BIGINT),
+            CAST(ROUND(datasketches_hll_estimate(sk)) AS BIGINT)
+        FROM (
+            SELECT from_base64_binary('${sk1Base64}') AS sk
+            UNION ALL SELECT from_base64_binary('${sk2Base64}')
+            UNION ALL SELECT NULL
+        ) t
+    """
+
+    // 5) Empty input should return 0
+    sql "DROP TABLE IF EXISTS ${emptyTableName}"
+    sql """
+        CREATE TABLE ${emptyTableName} (
+            id INT,
+            sk STRING
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+    qt_empty_input """SELECT CAST(ROUND(datasketches_hll_union_agg(sk)) AS 
BIGINT) FROM ${emptyTableName}"""
+
+    // 6) Illegal input should throw (base64 is valid but bytes are not a 
datasketches HLL sketch)
+    test {
+        sql """SELECT datasketches_hll_union_agg(from_base64('AA=='))"""
+        exception "CORRUPTION"
+    }
+    test {
+        sql """SELECT ds_hll_estimate(from_base64('AA=='))"""
+        exception "CORRUPTION"
+    }
+    test {
+        sql """SELECT datasketches_hll_estimate(from_base64('AA=='))"""
+        exception "CORRUPTION"
+    }
+
+    // Empty string is a valid STRING value, but it is an invalid serialized 
DataSketches HLL sketch.
+    // It should not fail at INSERT time; it should fail when the aggregate 
function reads it.
+    sql "DROP TABLE IF EXISTS ${badTableName}"
+    sql """
+        CREATE TABLE ${badTableName} (
+            id INT,
+            sk STRING
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+    sql """INSERT INTO ${badTableName} VALUES (1, '')"""
+    test {
+        sql """SELECT datasketches_hll_union_agg(sk) FROM ${badTableName}"""
+        exception "CORRUPTION"
+    }
+}
diff --git a/run-be-ut.sh b/run-be-ut.sh
index c9579ead354..1779edf05d7 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -41,7 +41,12 @@ ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && 
pwd)"
 
 export ROOT
 export DORIS_HOME="${ROOT}"
-
+if [[ -z "${DORIS_THIRDPARTY}" ]]; then
+    export DORIS_THIRDPARTY="${DORIS_HOME}/thirdparty"
+fi
+export TP_INCLUDE_DIR="${DORIS_THIRDPARTY}/installed/include"
+export TP_INSTALLED_DIR="${DORIS_THIRDPARTY}/installed"
+export TP_LIB_DIR="${DORIS_THIRDPARTY}/installed/lib"
 . "${DORIS_HOME}/env.sh"
 
 # Check args
@@ -174,6 +179,13 @@ update_submodule() {
     fi
 }
 
+echo "install datasketches-cpp to thirdparty path before build backend ut"
+update_submodule "contrib/datasketches-cpp" "datasketches-cpp" 
"https://github.com/apache/datasketches-cpp/archive/refs/heads/master.tar.gz";
+cd "${DORIS_HOME}/contrib/datasketches-cpp"
+"${CMAKE_CMD}" -S . -B build/Release -DCMAKE_BUILD_TYPE=Release 
-DCMAKE_INSTALL_PREFIX=$TP_INSTALLED_DIR -DBUILD_TESTS=OFF
+"${CMAKE_CMD}" --build build/Release -t install
+cd "${DORIS_HOME}"
+
 update_submodule "contrib/apache-orc" "apache-orc" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz";
 update_submodule "contrib/clucene" "clucene" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene.tar.gz";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to