This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new ebd75c9d08e branch-4.1: Add datasketches HLL sketch aggregate
functions #63143 (#63911)
ebd75c9d08e is described below
commit ebd75c9d08ef51397573bdafc916f0f867bb91b8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jun 1 10:06:41 2026 +0800
branch-4.1: Add datasketches HLL sketch aggregate functions #63143 (#63911)
Cherry-picked from #63143
Co-authored-by: nooneuse <[email protected]>
Co-authored-by: yuanyuhao <[email protected]>
---
.gitmodules | 3 +
...gregate_function_datasketches_hll_union_agg.cpp | 44 +
...aggregate_function_datasketches_hll_union_agg.h | 243 +++++
.../aggregate_function_simple_factory.cpp | 3 +
.../agg_datasketches_hll_union_agg_test.cpp | 1097 ++++++++++++++++++++
build.sh | 9 +
contrib/datasketches-cpp | 1 +
.../doris/catalog/BuiltinAggregateFunctions.java | 3 +
.../functions/agg/DataSketchesHllUnionAgg.java | 113 ++
.../visitor/AggregateFunctionVisitor.java | 5 +
.../test_datasketches_hll_union_agg.out | 28 +
.../test_datasketches_hll_union_agg.groovy | 170 +++
run-be-ut.sh | 14 +-
13 files changed, 1732 insertions(+), 1 deletion(-)
diff --git a/.gitmodules b/.gitmodules
index 54c1a8a3636..eb8e703aa8a 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -29,3 +29,6 @@
path = contrib/openblas
url = https://github.com/apache/doris-thirdparty.git
branch = openblas
+[submodule "contrib/datasketches-cpp"]
+ path = contrib/datasketches-cpp
+ url = https://github.com/apache/datasketches-cpp.git
diff --git
a/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.cpp
b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.cpp
new file mode 100644
index 00000000000..c9b7013e7a9
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.cpp
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h"
+
+#include <string>
+
+#include "core/data_type/data_type.h"
+#include "core/data_type/define_primitive_type.h"
+#include "exec/common/hash_table/hash.h" // IWYU pragma: keep
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+#include "exprs/aggregate/helpers.h"
+namespace doris {
+template <template <PrimitiveType> class Data>
+AggregateFunctionPtr create_aggregate_function_datasketches_hll_union_agg(
+ const std::string& name, const DataTypes& argument_types, const
DataTypePtr& result_type,
+ const bool result_is_nullable, const AggregateFunctionAttr& attr) {
+ return creator_with_type_list<TYPE_STRING, TYPE_VARCHAR,
TYPE_VARBINARY>::create<
+ AggregateFunctionDataSketchesHllUnionAgg, Data>(argument_types,
result_is_nullable,
+ attr);
+}
+void register_aggregate_function_datasketches_HLL_union_agg(
+ AggregateFunctionSimpleFactory& factory) {
+ AggregateFunctionCreator creator =
+
create_aggregate_function_datasketches_hll_union_agg<AggregateFunctionHllSketchData>;
+ factory.register_function_both("datasketches_hll_union_agg", creator);
+ factory.register_alias("datasketches_hll_union_agg", "ds_hll_estimate");
+ factory.register_alias("datasketches_hll_union_agg",
"datasketches_hll_estimate");
+}
+} // namespace doris
diff --git
a/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h
b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h
new file mode 100644
index 00000000000..d9f82f193e8
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <stddef.h>
+
+#include <DataSketches/hll.hpp>
+#include <algorithm>
+#include <boost/iterator/iterator_facade.hpp>
+#include <memory>
+#include <optional>
+#include <type_traits>
+#include <vector>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "core/assert_cast.h"
+#include "core/column/column.h"
+#include "core/column/column_varbinary.h"
+#include "core/column/column_vector.h"
+#include "core/custom_allocator.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/define_primitive_type.h"
+#include "core/field.h"
+#include "core/string_ref.h"
+#include "core/types.h"
+#include "core/uint128.h"
+#include "exec/common/hash_table/hash.h"
+#include "exec/common/hash_table/phmap_fwd_decl.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "util/var_int.h"
+template <typename T>
+struct HashCRC32;
+namespace doris {
+class Arena;
+class BufferReadable;
+class BufferWritable;
+template <PrimitiveType T>
+class ColumnDecimal;
+/// datasketches_hll_union_agg
+template <PrimitiveType T>
+struct AggregateFunctionHllSketchData {
+ /** We set the default LgK to 12,
+ * as this value is used as a performance baseline in the relevant
documentation.
+ * (https://datasketches.apache.org/docs/HLL/HllPerformance.html)
+ */
+ static constexpr uint8_t DEFAULT_LOG_K = 12;
+ using Alloc = CustomStdAllocator<uint8_t>;
+ using Sketch = datasketches::hll_sketch_alloc<Alloc>;
+ using Union = datasketches::hll_union_alloc<Alloc>;
+
+ std::optional<Union> hll_union_data;
+
+ static String get_name() { return "datasketches_hll_union_agg"; }
+
+ void merge(const Sketch& sketch_data) {
+ if (!hll_union_data.has_value()) {
+ /** We clamp max lg_k to [7, 21],
+ * considering that the code comment requires 7 to 21.
+ * See: datasketches-cpp/hll/include/hll.hpp:451
+ */
+ constexpr uint8_t MIN_UNION_LOG_K = 7;
+ const uint8_t union_lg_k =
+ std::clamp<uint8_t>(sketch_data.get_lg_config_k(),
MIN_UNION_LOG_K,
+
datasketches::hll_constants::MAX_LOG_K);
+ hll_union_data.emplace(union_lg_k, Alloc());
+ }
+ try {
+ hll_union_data->update(sketch_data);
+ } catch (const doris::Exception& e) {
+ throw Exception(e.code(), "Internal error happened when update HLL
sketch: {}",
+ e.to_string());
+ } catch (const std::exception& e) {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "Internal error happened when update HLL sketch:
{}", e.what());
+ } catch (...) {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "Internal error happened when update HLL sketch:
unknown exception.");
+ }
+ }
+ void reset() {
+ if (hll_union_data.has_value()) {
+ hll_union_data->reset();
+ }
+ hll_union_data.reset();
+ }
+
+ void write_sketch(BufferWritable& buf, const Sketch& sk) const {
+ auto serialized_bytes = sk.serialize_compact();
+ StringRef d(serialized_bytes.data(), serialized_bytes.size());
+ buf.write_binary(d);
+ }
+ void write(BufferWritable& buf) const {
+ if (!hll_union_data.has_value()) {
+ /** Using DEFAULT_LOG_K(12) here is surely sufficient,
+ * because in this case the union that actually needs to be
serialized should contain no data.
+ */
+ Union u(DEFAULT_LOG_K, Alloc());
+ write_sketch(buf, u.get_result());
+ return;
+ }
+ try {
+ auto cache = hll_union_data->get_result();
+ write_sketch(buf, cache);
+ } catch (const doris::Exception& e) {
+ throw Exception(e.code(), "Internal error happened when serialize
HLL sketch: {}",
+ e.to_string());
+ } catch (const std::exception& e) {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "Internal error happened when serialize HLL
sketch: {}", e.what());
+ } catch (...) {
+ throw Exception(
+ ErrorCode::INTERNAL_ERROR,
+ "Internal error happened when serialize HLL sketch:
unknown exception.");
+ }
+ }
+ void read(BufferReadable& buf) {
+ StringRef d;
+ buf.read_binary(d);
+
+ auto cache = [&]() -> Sketch {
+ try {
+ return Sketch::deserialize(d.data, d.size, Alloc());
+ } catch (const doris::Exception& e) {
+ throw Exception(e.code(), "Failed to deserialize HLL sketch
when read: {}",
+ e.to_string());
+ } catch (const std::exception& e) {
+ throw Exception(ErrorCode::CORRUPTION, "HLL sketch data
corrupted when read: {}",
+ e.what());
+ } catch (...) {
+ throw Exception(ErrorCode::CORRUPTION,
+ "HLL sketch data corrupted when read: unknown
exception.");
+ }
+ }();
+
+ merge(cache);
+ }
+ double get_result() const {
+ if (hll_union_data.has_value()) {
+ try {
+ return hll_union_data->get_estimate();
+ } catch (const doris::Exception& e) {
+ throw Exception(e.code(),
+ "Internal error happened when get HLL sketch
estimate: {}",
+ e.to_string());
+ } catch (const std::exception& e) {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "Internal error happened when get HLL sketch
estimate: {}",
+ e.what());
+ } catch (...) {
+ throw Exception(
+ ErrorCode::INTERNAL_ERROR,
+ "Internal error happened when get HLL sketch estimate:
unknown exception.");
+ }
+ }
+ return 0.0;
+ }
+};
+
+/// Calculates the number of different values approximately using hll sketch.
+template <PrimitiveType T, typename Data>
+class AggregateFunctionDataSketchesHllUnionAgg final
+ : public IAggregateFunctionDataHelper<Data,
+
AggregateFunctionDataSketchesHllUnionAgg<T, Data>>,
+ UnaryExpression,
+ NotNullableAggregateFunction {
+public:
+ AggregateFunctionDataSketchesHllUnionAgg(const DataTypes& argument_types_)
+ : IAggregateFunctionDataHelper<Data,
AggregateFunctionDataSketchesHllUnionAgg<T, Data>>(
+ argument_types_) {}
+ String get_name() const override { return Data::get_name(); }
+ DataTypePtr get_return_type() const override { return
std::make_shared<DataTypeFloat64>(); }
+ void reset(AggregateDataPtr __restrict place) const override {
this->data(place).reset(); }
+ void add(AggregateDataPtr __restrict place, const IColumn** columns,
ssize_t row_num,
+ Arena&) const override {
+ add_one(this->data(place), *columns[0], row_num);
+ }
+ void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+ Arena&) const override {
+ const auto& rhs_data = this->data(rhs);
+ if (!rhs_data.hll_union_data.has_value()) {
+ return;
+ }
+
this->data(place).merge(rhs_data.hll_union_data->get_result(datasketches::HLL_8));
+ }
+ void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
+ this->data(place).write(buf);
+ }
+ void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+ Arena&) const override {
+ this->data(place).read(buf);
+ }
+ void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
+
assert_cast<ColumnFloat64&>(to).get_data().push_back(this->data(place).get_result());
+ }
+
+private:
+ static void ALWAYS_INLINE add_one(Data& data, const IColumn& column,
ssize_t row_num) {
+ if constexpr (is_string_type(T) || is_varbinary(T)) {
+ const auto& src_column = assert_cast<const typename
PrimitiveTypeTraits<T>::ColumnType&,
+
TypeCheckOnRelease::DISABLE>(column);
+ StringRef value =
src_column.get_data_at(static_cast<size_t>(row_num));
+ if (value.empty()) {
+ throw Exception(ErrorCode::CORRUPTION,
+ "HLL sketch data corrupted when add: empty
input.");
+ }
+
+ using Sketch = typename Data::Sketch;
+ using Alloc = typename Data::Alloc;
+
+ auto sketch_data = [&]() -> Sketch {
+ try {
+ return Sketch::deserialize(value.begin(), value.size,
Alloc());
+ } catch (const doris::Exception& e) {
+ throw Exception(e.code(), "Failed to deserialize HLL
sketch when add: {}",
+ e.to_string());
+ } catch (const std::exception& e) {
+ throw Exception(ErrorCode::CORRUPTION, "HLL sketch data
corrupted when add: {}",
+ e.what());
+ } catch (...) {
+ throw Exception(ErrorCode::CORRUPTION,
+ "HLL sketch data corrupted when add:
unknown exception.");
+ }
+ }();
+
+ data.merge(sketch_data);
+ }
+ }
+};
+} // namespace doris
diff --git a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
index 9fc311cdf08..6e18e95bee3 100644
--- a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
@@ -38,6 +38,8 @@ void
register_aggregate_function_avg(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_count(AggregateFunctionSimpleFactory&
factory);
void register_aggregate_function_count_by_enum(AggregateFunctionSimpleFactory&
factory);
void register_aggregate_function_HLL_union_agg(AggregateFunctionSimpleFactory&
factory);
+void register_aggregate_function_datasketches_HLL_union_agg(
+ AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_uniq(AggregateFunctionSimpleFactory& factory);
void
register_aggregate_function_uniq_distribute_key(AggregateFunctionSimpleFactory&
factory);
void register_aggregate_function_bit(AggregateFunctionSimpleFactory& factory);
@@ -128,6 +130,7 @@ AggregateFunctionSimpleFactory&
AggregateFunctionSimpleFactory::instance() {
register_aggregate_function_replace_reader_load(instance);
register_aggregate_function_window_lead_lag_first_last(instance);
register_aggregate_function_HLL_union_agg(instance);
+ register_aggregate_function_datasketches_HLL_union_agg(instance);
register_aggregate_functions_corr(instance);
register_aggregate_functions_corr_welford(instance);
register_aggregate_function_covar_pop(instance);
diff --git a/be/test/exprs/aggregate/agg_datasketches_hll_union_agg_test.cpp
b/be/test/exprs/aggregate/agg_datasketches_hll_union_agg_test.cpp
new file mode 100644
index 00000000000..7783ec89e2c
--- /dev/null
+++ b/be/test/exprs/aggregate/agg_datasketches_hll_union_agg_test.cpp
@@ -0,0 +1,1097 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <DataSketches/hll.hpp>
+
+#include "agent/be_exec_version_manager.h"
+#include "common/config.h"
+#include "common/status.h"
+#include "core/column/column.h"
+#include "core/column/column_string.h"
+#include "core/column/column_varbinary.h"
+#include "core/custom_allocator.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_varbinary.h"
+#include "exec/common/hash_table/hash.h"
+#include "exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h"
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+
+namespace doris {
+
+void register_aggregate_function_datasketches_HLL_union_agg(
+ AggregateFunctionSimpleFactory& factory);
+
+class AggregateFunctionDataSketchesHllUnionAggTest : public ::testing::Test {
+protected:
+ void SetUp() override { arena = std::make_unique<Arena>(); }
+
+ void TearDown() override { arena.reset(); }
+
+ std::unique_ptr<Arena> arena;
+};
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testBasicUnion) {
+ // Create test: union multiple hll sketches and get correct cardinality
estimate
+ DataTypePtr input_type = std::make_shared<DataTypeString>();
+ DataTypes argument_types = {input_type};
+
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_STRING,
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+ // Create 2 different hll sketches, each with 100 unique values
+ datasketches::hll_sketch sketch1(8); // lg_k=8
+ for (int i = 0; i < 100; i++) {
+ sketch1.update(i);
+ }
+
+ datasketches::hll_sketch sketch2(8);
+ for (int i = 50; i < 150; i++) {
+ sketch2.update(i);
+ }
+
+ // Serialize both sketches into string column
+ auto column_string = ColumnString::create();
+ const auto ser1 = sketch1.serialize_compact();
+ column_string->insert_data((const char*)(ser1.data()), ser1.size());
+ const auto ser2 = sketch2.serialize_compact();
+ column_string->insert_data((const char*)(ser2.data()), ser2.size());
+
+ // Create aggregate data place
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ // Add both rows
+ const IColumn* columns[1] = {column_string.get()};
+ agg_func->add(place, columns, 0, *arena);
+ agg_func->add(place, columns, 1, *arena);
+
+ // Get result
+ ColumnFloat64 result_column;
+ agg_func->insert_result_into(place, result_column);
+
+ double estimate = result_column.get_data()[0];
+ EXPECT_GE(estimate, 130.0);
+ EXPECT_LE(estimate, 170.0);
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testMergeTwoAggStates) {
+ DataTypePtr input_type = std::make_shared<DataTypeString>();
+ DataTypes argument_types = {input_type};
+
+ using AggFunc =
+ AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+
AggregateFunctionHllSketchData<TYPE_STRING>>;
+ auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+ // Create two separate aggregate states
+ AggregateDataPtr place1 =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place1);
+
+ AggregateDataPtr place2 =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place2);
+
+ // Add different data to each state
+ datasketches::hll_sketch sketch1(8);
+ for (int i = 0; i < 100; i++) sketch1.update(i);
+ const auto ser1 = sketch1.serialize_compact();
+
+ datasketches::hll_sketch sketch2(8);
+ for (int i = 100; i < 200; i++) sketch2.update(i);
+ const auto ser2 = sketch2.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)(ser1.data()), ser1.size());
+ column_string->insert_data((const char*)(ser2.data()), ser2.size());
+
+ const IColumn* columns[1] = {column_string.get()};
+ agg_func->add(place1, columns, 0, *arena);
+ agg_func->add(place2, columns, 1, *arena);
+
+ // Merge second state into first
+ agg_func->merge(place1, place2, *arena);
+
+ // Get result
+ ColumnFloat64 result;
+ agg_func->insert_result_into(place1, result);
+ double estimate = result.get_data()[0];
+
+ EXPECT_GE(estimate, 170.0);
+ EXPECT_LE(estimate, 230.0); // 200 unique values expected
+
+ agg_func->destroy(place1);
+ agg_func->destroy(place2);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testMergeEmptyStateDoesNotCrash) {
+ DataTypePtr input_type = std::make_shared<DataTypeString>();
+ DataTypes argument_types = {input_type};
+
+ using AggFunc =
+ AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+
AggregateFunctionHllSketchData<TYPE_STRING>>;
+ auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+ AggregateDataPtr place_with_data =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place_with_data);
+
+ AggregateDataPtr empty_rhs_place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(empty_rhs_place);
+
+ datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+ for (int i = 0; i < 100; ++i) {
+ sketch.update(i);
+ }
+ const auto ser = sketch.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)ser.data(), ser.size());
+ const IColumn* columns[1] = {column_string.get()};
+ agg_func->add(place_with_data, columns, 0, *arena);
+
+ // Covers the "all NULL" style path: rhs exists but never saw add().
+ EXPECT_NO_THROW(agg_func->merge(place_with_data, empty_rhs_place, *arena));
+
+ ColumnFloat64 result;
+ agg_func->insert_result_into(place_with_data, result);
+ EXPECT_DOUBLE_EQ(result.get_data()[0], sketch.get_estimate());
+
+ // Empty string is invalid serialized sketch and should be rejected by
add().
+ // Merge-empty-state coverage is handled by the "never saw add()" path
above.
+
+ AggregateDataPtr empty_lhs_place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(empty_lhs_place);
+ EXPECT_NO_THROW(agg_func->merge(empty_lhs_place, empty_rhs_place, *arena));
+
+ ColumnFloat64 empty_merge_result;
+ agg_func->insert_result_into(empty_lhs_place, empty_merge_result);
+ EXPECT_DOUBLE_EQ(empty_merge_result.get_data()[0], 0.0);
+
+ agg_func->destroy(place_with_data);
+ agg_func->destroy(empty_rhs_place);
+ agg_func->destroy(empty_lhs_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testEmptyState) {
+ DataTypePtr input_type = std::make_shared<DataTypeString>();
+ DataTypes argument_types = {input_type};
+
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_STRING,
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ ColumnFloat64 result;
+ agg_func->insert_result_into(place, result);
+ EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testSerializeDeserialize)
{
+ DataTypePtr input_type = std::make_shared<DataTypeString>();
+ DataTypes argument_types = {input_type};
+
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_STRING,
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+ // Add some data
+ datasketches::hll_sketch sketch(8);
+ for (int i = 0; i < 100; i++) sketch.update(i);
+ const auto ser = sketch.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)(ser.data()), ser.size());
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ const IColumn* columns[1] = {column_string.get()};
+ agg_func->add(place, columns, 0, *arena);
+
+ // Serialize
+ auto buffer = ColumnString::create();
+ BufferWritable w(*buffer);
+ agg_func->serialize(place, w);
+ w.commit();
+
+ // Deserialize into new state
+ AggregateDataPtr new_place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(new_place);
+
+ BufferReadable r(buffer->get_data_at(0));
+ agg_func->deserialize(new_place, r, *arena);
+
+ // Compare results
+ ColumnFloat64 result1, result2;
+ agg_func->insert_result_into(place, result1);
+ agg_func->insert_result_into(new_place, result2);
+
+ EXPECT_DOUBLE_EQ(result1.get_data()[0], result2.get_data()[0]);
+
+ agg_func->destroy(place);
+ agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testReset) {
+ DataTypePtr input_type = std::make_shared<DataTypeString>();
+ DataTypes argument_types = {input_type};
+
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_STRING,
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+ datasketches::hll_sketch sketch(8);
+ for (int i = 0; i < 100; i++) sketch.update(i);
+ auto ser = sketch.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)(ser.data()), ser.size());
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ const IColumn* columns[1] = {column_string.get()};
+ agg_func->add(place, columns, 0, *arena);
+
+ // Reset
+ agg_func->reset(place);
+
+ ColumnFloat64 result;
+ agg_func->insert_result_into(place, result);
+ EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testResetThenAddReinitializesState) {
+ DataTypePtr input_type = std::make_shared<DataTypeString>();
+ DataTypes argument_types = {input_type};
+
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_STRING,
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+ datasketches::hll_sketch sketch1(8, datasketches::HLL_8);
+ for (int i = 0; i < 7; ++i) {
+ sketch1.update(i);
+ }
+ const auto ser1 = sketch1.serialize_compact();
+
+ datasketches::hll_sketch sketch2(8, datasketches::HLL_8);
+ for (int i = 10; i < 17; ++i) {
+ sketch2.update(i);
+ }
+ const auto ser2 = sketch2.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)ser1.data(), ser1.size());
+ column_string->insert_data((const char*)ser2.data(), ser2.size());
+ const IColumn* columns[1] = {column_string.get()};
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ agg_func->add(place, columns, 0, *arena);
+ agg_func->reset(place);
+ agg_func->add(place, columns, 1, *arena);
+
+ ColumnFloat64 result;
+ agg_func->insert_result_into(place, result);
+ EXPECT_DOUBLE_EQ(result.get_data()[0], sketch2.get_estimate());
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+ testVarcharResetThenAddAndMergeEmptyRhsDoesNotCrash) {
+ DataTypePtr input_type = std::make_shared<DataTypeString>();
+ DataTypes argument_types = {std::make_shared<DataTypeString>(-1,
TYPE_VARCHAR)};
+
+ using AggFunc =
+ AggregateFunctionDataSketchesHllUnionAgg<TYPE_VARCHAR,
+
AggregateFunctionHllSketchData<TYPE_VARCHAR>>;
+ auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+ datasketches::hll_sketch sketch1(8, datasketches::HLL_8);
+ for (int i = 0; i < 7; ++i) {
+ sketch1.update(i);
+ }
+ const auto ser1 = sketch1.serialize_compact();
+
+ datasketches::hll_sketch sketch2(8, datasketches::HLL_8);
+ for (int i = 10; i < 17; ++i) {
+ sketch2.update(i);
+ }
+ const auto ser2 = sketch2.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)ser1.data(), ser1.size());
+ column_string->insert_data((const char*)ser2.data(), ser2.size());
+ const IColumn* columns[1] = {column_string.get()};
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ agg_func->add(place, columns, 0, *arena);
+ agg_func->reset(place);
+ agg_func->add(place, columns, 1, *arena);
+
+ ColumnFloat64 after_reset;
+ agg_func->insert_result_into(place, after_reset);
+ EXPECT_DOUBLE_EQ(after_reset.get_data()[0], sketch2.get_estimate());
+
+ AggregateDataPtr empty_rhs_place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(empty_rhs_place);
+
+ EXPECT_NO_THROW(agg_func->merge(place, empty_rhs_place, *arena));
+
+ ColumnFloat64 after_merge;
+ agg_func->insert_result_into(place, after_merge);
+ EXPECT_DOUBLE_EQ(after_merge.get_data()[0], sketch2.get_estimate());
+
+ AggregateDataPtr empty_lhs_place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(empty_lhs_place);
+
+ EXPECT_NO_THROW(agg_func->merge(empty_lhs_place, empty_rhs_place, *arena));
+
+ ColumnFloat64 empty_merge_result;
+ agg_func->insert_result_into(empty_lhs_place, empty_merge_result);
+ EXPECT_DOUBLE_EQ(empty_merge_result.get_data()[0], 0.0);
+
+ agg_func->destroy(place);
+ agg_func->destroy(empty_rhs_place);
+ agg_func->destroy(empty_lhs_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testFactoryCreateAndAliases) {
+ AggregateFunctionSimpleFactory factory;
+ register_aggregate_function_datasketches_HLL_union_agg(factory);
+ int be_version = BeExecVersionManager::get_newest_version();
+
+ DataTypes argument_types = {std::make_shared<DataTypeString>()};
+
+ auto fn_main =
+ factory.get("datasketches_hll_union_agg", argument_types, nullptr,
false, be_version);
+ auto fn_alias_sr_estimate =
+ factory.get("ds_hll_estimate", argument_types, nullptr, false,
be_version);
+ auto fn_alias_datasketches_estimate =
+ factory.get("datasketches_hll_estimate", argument_types, nullptr,
false, be_version);
+
+ ASSERT_NE(fn_main, nullptr);
+ ASSERT_NE(fn_alias_sr_estimate, nullptr);
+ ASSERT_NE(fn_alias_datasketches_estimate, nullptr);
+
+ datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+ for (int i = 0; i < 7; ++i) sketch.update(i);
+ const auto ser = sketch.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)ser.data(), ser.size());
+ const IColumn* columns[1] = {column_string.get()};
+
+ auto run_and_get_result = [&](const AggregateFunctionPtr& fn) {
+ AggregateDataPtr place = arena->aligned_alloc(fn->size_of_data(),
fn->align_of_data());
+ fn->create(place);
+ fn->add(place, columns, 0, *arena);
+ ColumnFloat64 result;
+ fn->insert_result_into(place, result);
+ fn->destroy(place);
+ return result.get_data()[0];
+ };
+
+ double expected = sketch.get_estimate();
+ EXPECT_DOUBLE_EQ(run_and_get_result(fn_main), expected);
+ EXPECT_DOUBLE_EQ(run_and_get_result(fn_alias_sr_estimate), expected);
+ EXPECT_DOUBLE_EQ(run_and_get_result(fn_alias_datasketches_estimate),
expected);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+ testFactoryCreateForVarcharAndNullableAndUnsupportedType) {
+ AggregateFunctionSimpleFactory factory;
+ register_aggregate_function_datasketches_HLL_union_agg(factory);
+ int be_version = BeExecVersionManager::get_newest_version();
+
+ DataTypes varchar_types = {std::make_shared<DataTypeString>(-1,
TYPE_VARCHAR)};
+ auto fn_varchar =
+ factory.get("datasketches_hll_union_agg", varchar_types, nullptr,
false, be_version);
+ auto fn_varchar_alias =
+ factory.get("ds_hll_estimate", varchar_types, nullptr, false,
be_version);
+ ASSERT_NE(fn_varchar, nullptr);
+ ASSERT_NE(fn_varchar_alias, nullptr);
+
+ datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+ for (int i = 0; i < 7; ++i) {
+ sketch.update(i);
+ }
+ const auto ser = sketch.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)ser.data(), ser.size());
+ const IColumn* columns[1] = {column_string.get()};
+
+ auto run_and_get_result = [&](const AggregateFunctionPtr& fn) {
+ AggregateDataPtr place = arena->aligned_alloc(fn->size_of_data(),
fn->align_of_data());
+ fn->create(place);
+ fn->add(place, columns, 0, *arena);
+ ColumnFloat64 result;
+ fn->insert_result_into(place, result);
+ fn->destroy(place);
+ return result.get_data()[0];
+ };
+
+ double expected = sketch.get_estimate();
+ EXPECT_DOUBLE_EQ(run_and_get_result(fn_varchar), expected);
+ EXPECT_DOUBLE_EQ(run_and_get_result(fn_varchar_alias), expected);
+
+ DataTypes nullable_varchar_types = {
+ make_nullable(std::make_shared<DataTypeString>(-1, TYPE_VARCHAR))};
+ auto fn_nullable_varchar = factory.get("datasketches_hll_union_agg",
nullable_varchar_types,
+ nullptr, false, be_version);
+ ASSERT_NE(fn_nullable_varchar, nullptr);
+
+ DataTypes unsupported_types = {std::make_shared<DataTypeInt32>()};
+ auto fn_unsupported = factory.get("datasketches_hll_union_agg",
unsupported_types, nullptr,
+ false, be_version);
+ EXPECT_EQ(fn_unsupported, nullptr);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testLowLgKSketchDoesNotReportCorruption) {
+ DataTypes argument_types = {std::make_shared<DataTypeString>()};
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_STRING,
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+ datasketches::hll_sketch sketch(4, datasketches::HLL_8);
+ for (int i = 0; i < 100; ++i) {
+ sketch.update(i);
+ }
+ const auto ser = sketch.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)ser.data(), ser.size());
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ const IColumn* columns[1] = {column_string.get()};
+ EXPECT_NO_THROW(agg_func->add(place, columns, 0, *arena));
+
+ ColumnFloat64 add_result;
+ agg_func->insert_result_into(place, add_result);
+
+ EXPECT_GE(add_result.get_data()[0], 50);
+ EXPECT_LE(add_result.get_data()[0], 150);
+
+ auto buf = ColumnString::create();
+ BufferWritable w(*buf);
+ StringRef d((const char*)ser.data(), ser.size());
+ w.write_binary(d);
+ w.commit();
+
+ AggregateDataPtr place2 =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place2);
+
+ BufferReadable r(buf->get_data_at(0));
+ EXPECT_NO_THROW(agg_func->deserialize(place2, r, *arena));
+
+ ColumnFloat64 deserialize_result;
+ agg_func->insert_result_into(place2, deserialize_result);
+ EXPECT_EQ(deserialize_result.get_data()[0], add_result.get_data()[0]);
+
+ agg_func->destroy(place);
+ agg_func->destroy(place2);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testAddEmptyStringThrows)
{
+ DataTypes argument_types = {std::make_shared<DataTypeString>()};
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_STRING,
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data("", 0);
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ const IColumn* columns[1] = {column_string.get()};
+
+ try {
+ agg_func->add(place, columns, 0, *arena);
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+ }
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testResetOnEmptyState) {
+ DataTypes argument_types = {std::make_shared<DataTypeString>()};
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_STRING,
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ EXPECT_NO_THROW(agg_func->reset(place)); // cover reset() branch when
union_data is nullptr
+
+ ColumnFloat64 result;
+ agg_func->insert_result_into(place, result);
+ EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testVarbinaryInput) {
+ AggregateFunctionSimpleFactory factory;
+ register_aggregate_function_datasketches_HLL_union_agg(factory);
+ int be_version = BeExecVersionManager::get_newest_version();
+
+ DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+ auto fn = factory.get("datasketches_hll_union_agg", argument_types,
nullptr, false, be_version);
+ ASSERT_NE(fn, nullptr);
+
+ datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+ for (int i = 20; i < 30; ++i) sketch.update(i);
+ const auto ser = sketch.serialize_compact();
+
+ auto column_varbinary = ColumnVarbinary::create();
+ column_varbinary->insert_data((const char*)ser.data(), ser.size());
+
+ const IColumn* columns[1] = {column_varbinary.get()};
+
+ AggregateDataPtr place = arena->aligned_alloc(fn->size_of_data(),
fn->align_of_data());
+ fn->create(place);
+ fn->add(place, columns, 0, *arena);
+
+ ColumnFloat64 result;
+ fn->insert_result_into(place, result);
+ EXPECT_DOUBLE_EQ(result.get_data()[0], sketch.get_estimate());
+
+ fn->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testVarbinaryAddEmptyStringThrows) {
+ DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_VARBINARY,
AggregateFunctionHllSketchData<TYPE_VARBINARY>>>(argument_types);
+
+ auto column_varbinary = ColumnVarbinary::create();
+ column_varbinary->insert_data("", 0);
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ const IColumn* columns[1] = {column_varbinary.get()};
+
+ try {
+ agg_func->add(place, columns, 0, *arena);
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+ }
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testVarbinaryCorruptedInputThrows) {
+ DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_VARBINARY,
AggregateFunctionHllSketchData<TYPE_VARBINARY>>>(argument_types);
+
+ auto column_varbinary = ColumnVarbinary::create();
+ column_varbinary->insert_data("x", 1);
+ const IColumn* columns[1] = {column_varbinary.get()};
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ try {
+ agg_func->add(place, columns, 0, *arena);
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+ }
+
+ auto corrupt_buf = ColumnString::create();
+ BufferWritable corrupt_w(*corrupt_buf);
+ StringRef corrupted("x", 1);
+ corrupt_w.write_binary(corrupted);
+ corrupt_w.commit();
+
+ BufferReadable r(corrupt_buf->get_data_at(0));
+ try {
+ agg_func->deserialize(place, r, *arena);
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+ }
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testVarbinarySerializeDeserialize) {
+ DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_VARBINARY,
AggregateFunctionHllSketchData<TYPE_VARBINARY>>>(argument_types);
+
+ datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+ for (int i = 0; i < 100; ++i) {
+ sketch.update(i);
+ }
+ const auto ser = sketch.serialize_compact();
+
+ auto column_varbinary = ColumnVarbinary::create();
+ column_varbinary->insert_data((const char*)ser.data(), ser.size());
+ const IColumn* columns[1] = {column_varbinary.get()};
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+ agg_func->add(place, columns, 0, *arena);
+
+ auto buffer = ColumnString::create();
+ BufferWritable w(*buffer);
+ agg_func->serialize(place, w);
+ w.commit();
+
+ AggregateDataPtr new_place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(new_place);
+
+ BufferReadable r(buffer->get_data_at(0));
+ agg_func->deserialize(new_place, r, *arena);
+
+ ColumnFloat64 result1, result2;
+ agg_func->insert_result_into(place, result1);
+ agg_func->insert_result_into(new_place, result2);
+
+ EXPECT_DOUBLE_EQ(result1.get_data()[0], result2.get_data()[0]);
+
+ agg_func->destroy(place);
+ agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testSerializeDeserializeEmptyState) {
+ DataTypes argument_types = {std::make_shared<DataTypeString>()};
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_STRING,
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ auto buffer = ColumnString::create();
+ BufferWritable w(*buffer);
+ EXPECT_NO_THROW(agg_func->serialize(place, w)); // covers write()
empty-state branch
+ w.commit();
+
+ AggregateDataPtr new_place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(new_place);
+
+ BufferReadable r(buffer->get_data_at(0));
+ agg_func->deserialize(new_place, r, *arena);
+
+ ColumnFloat64 result;
+ agg_func->insert_result_into(new_place, result);
+ EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+ agg_func->destroy(place);
+ agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testCorruptedInputThrows)
{
+ DataTypes argument_types = {std::make_shared<DataTypeString>()};
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_STRING,
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data("x", 1); // definitely not a valid sketch
+ const IColumn* columns[1] = {column_string.get()};
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ try {
+ agg_func->add(place, columns, 0, *arena); // covers add() CORRUPTION
catch branch
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+ }
+
+ auto corrupt_buf = ColumnString::create();
+ BufferWritable corrupt_w(*corrupt_buf);
+ StringRef corrupted("x", 1);
+ corrupt_w.write_binary(corrupted);
+ corrupt_w.commit();
+
+ BufferReadable r(corrupt_buf->get_data_at(0));
+ try {
+ agg_func->deserialize(place, r, *arena); // covers read() CORRUPTION
catch branch
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+ }
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testAllocatorAwareSketchInput) {
+ DataTypes argument_types = {std::make_shared<DataTypeString>()};
+ using AggFunc =
+ AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+
AggregateFunctionHllSketchData<TYPE_STRING>>;
+ auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+ using Alloc = doris::CustomStdAllocator<uint8_t>;
+ using Sketch = datasketches::hll_sketch_alloc<Alloc>;
+
+ Sketch sketch(8, datasketches::HLL_8, false, Alloc());
+ for (int i = 0; i < 7; ++i) {
+ sketch.update(i);
+ }
+ const auto ser = sketch.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)ser.data(), ser.size());
+ const IColumn* columns[1] = {column_string.get()};
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ agg_func->add(place, columns, 0, *arena);
+
+ ColumnFloat64 result;
+ agg_func->insert_result_into(place, result);
+ EXPECT_DOUBLE_EQ(result.get_data()[0], sketch.get_estimate());
+
+ agg_func->destroy(place);
+}
+
+namespace {
+
+class ScopedMemAllocFaultInjection {
+public:
+ explicit ScopedMemAllocFaultInjection(double probability)
+ : _old_probability(doris::config::mem_alloc_fault_probability) {
+ doris::config::mem_alloc_fault_probability = probability;
+ doris::enable_thread_catch_bad_alloc++;
+ }
+
+ ~ScopedMemAllocFaultInjection() {
+ doris::enable_thread_catch_bad_alloc--;
+ doris::config::mem_alloc_fault_probability = _old_probability;
+ }
+
+ ScopedMemAllocFaultInjection(const ScopedMemAllocFaultInjection&) = delete;
+ ScopedMemAllocFaultInjection& operator=(const
ScopedMemAllocFaultInjection&) = delete;
+
+private:
+ double _old_probability;
+};
+
+} // namespace
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testMetaInfoCoversInlineMethods) {
+ DataTypes argument_types = {std::make_shared<DataTypeString>()};
+
+ using AggFunc =
+ AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+
AggregateFunctionHllSketchData<TYPE_STRING>>;
+ auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+ EXPECT_EQ(AggregateFunctionHllSketchData<TYPE_STRING>::get_name(),
+ "datasketches_hll_union_agg");
+ EXPECT_EQ(agg_func->get_name(), "datasketches_hll_union_agg");
+
+ auto return_type = agg_func->get_return_type();
+ EXPECT_TRUE(return_type->equals(*std::make_shared<DataTypeFloat64>()));
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testAddMemAllocFailedThrowsMemAllocFailed) {
+ DataTypes argument_types = {std::make_shared<DataTypeString>()};
+ using AggFunc =
+ AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+
AggregateFunctionHllSketchData<TYPE_STRING>>;
+ auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+ datasketches::hll_sketch sketch(12, datasketches::HLL_8);
+ for (int i = 0; i < 1000; ++i) {
+ sketch.update(i);
+ }
+ const auto ser = sketch.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)ser.data(), ser.size());
+ const IColumn* columns[1] = {column_string.get()};
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ {
+ ScopedMemAllocFaultInjection inject(1.0);
+ try {
+ agg_func->add(place, columns, 0, *arena);
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+ }
+ }
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+ testDeserializeMemAllocFailedThrowsMemAllocFailed) {
+ DataTypes argument_types = {std::make_shared<DataTypeString>()};
+ using AggFunc =
+ AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+
AggregateFunctionHllSketchData<TYPE_STRING>>;
+ auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+ datasketches::hll_sketch sketch(12, datasketches::HLL_8);
+ for (int i = 0; i < 1000; ++i) {
+ sketch.update(i);
+ }
+ const auto ser = sketch.serialize_compact();
+
+ auto buffer = ColumnString::create();
+ BufferWritable w(*buffer);
+ StringRef d((const char*)ser.data(), ser.size());
+ w.write_binary(d);
+ w.commit();
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ BufferReadable r(buffer->get_data_at(0));
+ {
+ ScopedMemAllocFaultInjection inject(1.0);
+ try {
+ agg_func->deserialize(place, r, *arena);
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+ }
+ }
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testVarcharAddEmptyStringThrows) {
+ DataTypes argument_types = {std::make_shared<DataTypeString>(-1,
TYPE_VARCHAR)};
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_VARCHAR,
AggregateFunctionHllSketchData<TYPE_VARCHAR>>>(argument_types);
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data("", 0);
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ const IColumn* columns[1] = {column_string.get()};
+
+ try {
+ agg_func->add(place, columns, 0, *arena);
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+ }
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testVarcharCorruptedInputThrows) {
+ DataTypes argument_types = {std::make_shared<DataTypeString>(-1,
TYPE_VARCHAR)};
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_VARCHAR,
AggregateFunctionHllSketchData<TYPE_VARCHAR>>>(argument_types);
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data("x", 1);
+ const IColumn* columns[1] = {column_string.get()};
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+
+ try {
+ agg_func->add(place, columns, 0, *arena);
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+ }
+
+ auto corrupt_buf = ColumnString::create();
+ BufferWritable corrupt_w(*corrupt_buf);
+ StringRef corrupted("x", 1);
+ corrupt_w.write_binary(corrupted);
+ corrupt_w.commit();
+
+ BufferReadable r(corrupt_buf->get_data_at(0));
+ try {
+ agg_func->deserialize(place, r, *arena);
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+ }
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testVarcharSerializeDeserialize) {
+ DataTypes argument_types = {std::make_shared<DataTypeString>(-1,
TYPE_VARCHAR)};
+ auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+ TYPE_VARCHAR,
AggregateFunctionHllSketchData<TYPE_VARCHAR>>>(argument_types);
+
+ datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+ for (int i = 0; i < 100; i++) {
+ sketch.update(i);
+ }
+ const auto ser = sketch.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)ser.data(), ser.size());
+ const IColumn* columns[1] = {column_string.get()};
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+ agg_func->add(place, columns, 0, *arena);
+
+ auto buffer = ColumnString::create();
+ BufferWritable w(*buffer);
+ agg_func->serialize(place, w);
+ w.commit();
+
+ AggregateDataPtr new_place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(new_place);
+
+ BufferReadable r(buffer->get_data_at(0));
+ agg_func->deserialize(new_place, r, *arena);
+
+ ColumnFloat64 result1, result2;
+ agg_func->insert_result_into(place, result1);
+ agg_func->insert_result_into(new_place, result2);
+
+ EXPECT_DOUBLE_EQ(result1.get_data()[0], result2.get_data()[0]);
+
+ agg_func->destroy(place);
+ agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+ testSerializeMemAllocFailedThrowsMemAllocFailed) {
+ DataTypes argument_types = {std::make_shared<DataTypeString>()};
+ using AggFunc =
+ AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+
AggregateFunctionHllSketchData<TYPE_STRING>>;
+ auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+ datasketches::hll_sketch sketch(12, datasketches::HLL_8);
+ for (int i = 0; i < 2000; ++i) {
+ sketch.update(i);
+ }
+ const auto ser = sketch.serialize_compact();
+
+ auto column_string = ColumnString::create();
+ column_string->insert_data((const char*)ser.data(), ser.size());
+ const IColumn* columns[1] = {column_string.get()};
+
+ AggregateDataPtr place =
+ arena->aligned_alloc(agg_func->size_of_data(),
agg_func->align_of_data());
+ agg_func->create(place);
+ agg_func->add(place, columns, 0, *arena);
+
+ auto buffer = ColumnString::create();
+ BufferWritable w(*buffer);
+
+ {
+ ScopedMemAllocFaultInjection inject(1.0);
+ try {
+ agg_func->serialize(place, w);
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+ EXPECT_NE(e.to_string().find("serialize HLL sketch"),
std::string::npos);
+ }
+ }
+
+ agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
testDataMergeDownsampleMemAllocFailed) {
+ using Data = AggregateFunctionHllSketchData<TYPE_STRING>;
+ using Alloc = Data::Alloc;
+ using Sketch = Data::Sketch;
+
+ Data data;
+
+ Sketch large_k_sketch(12, datasketches::HLL_8, true, Alloc());
+ for (int i = 0; i < 5000; ++i) {
+ large_k_sketch.update(i);
+ }
+ EXPECT_NO_THROW(data.merge(large_k_sketch));
+
+ Sketch small_k_sketch(8, datasketches::HLL_8, true, Alloc());
+ for (int i = 0; i < 5000; ++i) {
+ small_k_sketch.update(i);
+ }
+
+ {
+ ScopedMemAllocFaultInjection inject(1.0);
+ try {
+ data.merge(small_k_sketch);
+ FAIL() << "Expected doris::Exception";
+ } catch (const doris::Exception& e) {
+ EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+ EXPECT_NE(e.to_string().find("update HLL sketch"),
std::string::npos);
+ }
+ }
+}
+
+} // namespace doris
diff --git a/build.sh b/build.sh
index 66c9836cf0a..c0cba232b2d 100755
--- a/build.sh
+++ b/build.sh
@@ -34,6 +34,7 @@ if [[ -z "${DORIS_THIRDPARTY}" ]]; then
export DORIS_THIRDPARTY="${DORIS_HOME}/thirdparty"
fi
export TP_INCLUDE_DIR="${DORIS_THIRDPARTY}/installed/include"
+export TP_INSTALLED_DIR="${DORIS_THIRDPARTY}/installed"
export TP_LIB_DIR="${DORIS_THIRDPARTY}/installed/lib"
HADOOP_DEPS_NAME="hadoop-deps"
. "${DORIS_HOME}/env.sh"
@@ -628,6 +629,14 @@ FE_MODULES="$(
# Clean and build Backend
if [[ "${BUILD_BE}" -eq 1 ]]; then
+
+ echo "install datasketches-cpp to thirdparty path before build be"
+ update_submodule "contrib/datasketches-cpp" "datasketches-cpp"
"https://github.com/apache/datasketches-cpp/archive/refs/heads/master.tar.gz"
+ cd "${DORIS_HOME}/contrib/datasketches-cpp"
+ "${CMAKE_CMD}" -S . -B build/Release -DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX=$TP_INSTALLED_DIR -DBUILD_TESTS=OFF
+ "${CMAKE_CMD}" --build build/Release -t install
+ cd "${DORIS_HOME}"
+
update_submodule "contrib/apache-orc" "apache-orc"
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz"
update_submodule "contrib/clucene" "clucene"
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene.tar.gz"
update_submodule "contrib/openblas" "openblas"
"https://github.com/apache/doris-thirdparty/archive/refs/heads/openblas.tar.gz"
diff --git a/contrib/datasketches-cpp b/contrib/datasketches-cpp
new file mode 160000
index 00000000000..de8553ba372
--- /dev/null
+++ b/contrib/datasketches-cpp
@@ -0,0 +1 @@
+Subproject commit de8553ba372e618382c2e7b44b0ffc9422b9458c
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
index 66140acc790..8d08bba9ce7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
@@ -38,6 +38,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum;
import org.apache.doris.nereids.trees.expressions.functions.agg.Covar;
import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.DataSketchesHllUnionAgg;
import
org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage;
import
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect;
import
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion;
@@ -148,6 +149,8 @@ public class BuiltinAggregateFunctions implements
FunctionHelper {
agg(Histogram.class, "hist", "histogram"),
agg(HllUnion.class, "hll_raw_agg", "hll_union"),
agg(HllUnionAgg.class, "hll_union_agg"),
+ agg(DataSketchesHllUnionAgg.class,
"datasketches_hll_union_agg",
+ "ds_hll_estimate", "datasketches_hll_estimate"),
agg(IntersectCount.class, "intersect_count"),
agg(Kurt.class, "kurt", "kurt_pop", "kurtosis"),
agg(LinearHistogram.class, "linear_histogram"),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/DataSketchesHllUnionAgg.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/DataSketchesHllUnionAgg.java
new file mode 100644
index 00000000000..9fe46d2a77d
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/DataSketchesHllUnionAgg.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.agg;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.functions.Function;
+import org.apache.doris.nereids.trees.expressions.functions.FunctionTrait;
+import org.apache.doris.nereids.trees.expressions.literal.DoubleLiteral;
+import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.DoubleType;
+import org.apache.doris.nereids.types.StringType;
+import org.apache.doris.nereids.types.VarBinaryType;
+import org.apache.doris.nereids.types.VarcharType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/** datasketches_hll_union_agg agg function. */
+public class DataSketchesHllUnionAgg extends NotNullableAggregateFunction
+ implements UnaryExpression, ExplicitlyCastableSignature,
FunctionTrait, RollUpTrait {
+ public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
+
FunctionSignature.ret(DoubleType.INSTANCE).args(StringType.INSTANCE),
+
FunctionSignature.ret(DoubleType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT),
+
FunctionSignature.ret(DoubleType.INSTANCE).args(VarBinaryType.INSTANCE)
+ );
+
+ /**
+ * constructor with 1 argument.
+ */
+ public DataSketchesHllUnionAgg(Expression arg) {
+ super("datasketches_hll_union_agg", arg);
+ }
+
+ /**
+ * constructor with 1 argument.
+ */
+ public DataSketchesHllUnionAgg(boolean distinct, Expression arg) {
+ this(arg);
+ }
+
+ /** constructor for withChildren and reuse signature */
+ protected DataSketchesHllUnionAgg(AggregateFunctionParams functionParams) {
+ super(functionParams);
+ }
+
+ @Override
+ public void checkLegalityBeforeTypeCoercion() {
+ DataType inputType = getArgumentType(0);
+ if (!(inputType.isStringType() || inputType.isVarcharType() ||
inputType.isVarBinaryType()
+ || inputType.isNullType())) {
+ throw new AnalysisException(getName()
+ + " function's argument should be of STRING/VARCHAR/VARBINARY
type, but was " + inputType);
+ }
+ }
+
+ @Override
+ protected List<DataType> intermediateTypes() {
+ return ImmutableList.of(StringType.INSTANCE);
+ }
+
+ @Override
+ public List<FunctionSignature> getSignatures() {
+ return SIGNATURES;
+ }
+
+ @Override
+ public DataSketchesHllUnionAgg withDistinctAndChildren(boolean distinct,
List<Expression> children) {
+ Preconditions.checkArgument(children.size() == 1);
+ return new DataSketchesHllUnionAgg(getFunctionParams(distinct,
children));
+ }
+
+ @Override
+ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+ return visitor.visitDataSketchesHllUnionAgg(this, context);
+ }
+
+ @Override
+ public Function constructRollUp(Expression param, Expression... varParams)
{
+ return new
DataSketchesHllUnionAgg(getFunctionParams(ImmutableList.of(param)));
+ }
+
+ @Override
+ public boolean canRollUp() {
+ return false;
+ }
+
+ @Override
+ public Expression resultForEmptyInput() {
+ return new DoubleLiteral(0);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
index 251012c16fe..5292a967c27 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
@@ -39,6 +39,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum;
import org.apache.doris.nereids.trees.expressions.functions.agg.Covar;
import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.DataSketchesHllUnionAgg;
import
org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage;
import
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect;
import
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion;
@@ -238,6 +239,10 @@ public interface AggregateFunctionVisitor<R, C> {
return visitAggregateFunction(histogram, context);
}
+ default R visitDataSketchesHllUnionAgg(DataSketchesHllUnionAgg
datasketchesHllUnionAgg, C context) {
+ return visitAggregateFunction(datasketchesHllUnionAgg, context);
+ }
+
default R visitHllUnion(HllUnion hllUnion, C context) {
return visitAggregateFunction(hllUnion, context);
}
diff --git
a/regression-test/data/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.out
b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.out
new file mode 100644
index 00000000000..ae1424305c7
--- /dev/null
+++
b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.out
@@ -0,0 +1,28 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !basic_union --
+17
+
+-- !aliases --
+17 17 17
+
+-- !group_by --
+1 7
+2 10
+
+-- !distinct --
+17 17
+
+-- !basic_union_varchar --
+17
+
+-- !aliases_varchar --
+17 17 17
+
+-- !basic_union_varbinary --
+17
+
+-- !aliases_varbinary --
+17 17 17
+
+-- !empty_input --
+0
diff --git
a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.groovy
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.groovy
new file mode 100644
index 00000000000..8d86b8c82d6
--- /dev/null
+++
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.groovy
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_datasketches_hll_union_agg") {
+ def tableName = "test_datasketches_hll_union_agg_tbl"
+ def varcharTableName = "test_datasketches_hll_union_agg_varchar_tbl"
+ def emptyTableName = "test_datasketches_hll_union_agg_empty_tbl"
+ def badTableName = "test_datasketches_hll_union_agg_bad_tbl"
+
+ // sk = new HllSketch(8, HLL_8); for (int i = 0; i < 7; i++) sk.update(i);
+ def sk1Base64 = "AgEHCAMIBwjL18IEK/L7BoYv+Q11gWYHgbxdBntl5gj8LUIK"
+
+ // sk = new HllSketch(8, HLL_8); for (int i = 20; i < 30; i++)
sk.update(i);
+ def sk2Base64 =
"AwEHCAUIAAkKAAAAIjvrBcS1nwfGGWoEyHokBO8t9wc1qTEENkcJB7hWqQxZf9QNnuSbGA=="
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE ${tableName} (
+ id INT,
+ sk STRING
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ sql """
+ INSERT INTO ${tableName} VALUES
+ (1, from_base64('${sk1Base64}')),
+ (2, from_base64('${sk2Base64}')),
+ (3, NULL)
+ """
+
+ // 1) Basic union: {0..6} U {20..29} => 17 distinct values
+ qt_basic_union """SELECT CAST(ROUND(datasketches_hll_union_agg(sk)) AS
BIGINT) FROM ${tableName}"""
+
+ // 2) Aliases should behave identically
+ qt_aliases """SELECT
+ CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+ CAST(ROUND(ds_hll_estimate(sk)) AS BIGINT),
+ CAST(ROUND(datasketches_hll_estimate(sk)) AS BIGINT)
+ FROM ${tableName}
+ """
+
+ // 3) Group-by
+ qt_group_by """SELECT id, CAST(ROUND(datasketches_hll_union_agg(sk)) AS
BIGINT)
+ FROM ${tableName}
+ WHERE id IN (1, 2)
+ GROUP BY id
+ ORDER BY id
+ """
+
+ // 4) DISTINCT should not change result in this data set
+ sql "INSERT INTO ${tableName} VALUES (5, from_base64('${sk1Base64}'))"
+ qt_distinct """SELECT
+ CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+ CAST(ROUND(datasketches_hll_union_agg(DISTINCT sk)) AS BIGINT)
+ FROM ${tableName}
+ """
+
+ // 4.1) Input type coverage: VARCHAR
+ sql "DROP TABLE IF EXISTS ${varcharTableName}"
+ sql """
+ CREATE TABLE ${varcharTableName} (
+ id INT,
+ sk VARCHAR(256)
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ sql """
+ INSERT INTO ${varcharTableName} VALUES
+ (1, from_base64('${sk1Base64}')),
+ (2, from_base64('${sk2Base64}')),
+ (3, NULL)
+ """
+
+ qt_basic_union_varchar """SELECT
CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT) FROM
${varcharTableName}"""
+
+ qt_aliases_varchar """SELECT
+ CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+ CAST(ROUND(ds_hll_estimate(sk)) AS BIGINT),
+ CAST(ROUND(datasketches_hll_estimate(sk)) AS BIGINT)
+ FROM ${varcharTableName}
+ """
+
+ // 4.2) Input type coverage: VARBINARY (no table column; VARBINARY is not
supported for table storage)
+ qt_basic_union_varbinary """SELECT
CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT) FROM (
+ SELECT from_base64_binary('${sk1Base64}') AS sk
+ UNION ALL SELECT from_base64_binary('${sk2Base64}')
+ UNION ALL SELECT NULL
+ ) t
+ """
+
+ qt_aliases_varbinary """SELECT
+ CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+ CAST(ROUND(ds_hll_estimate(sk)) AS BIGINT),
+ CAST(ROUND(datasketches_hll_estimate(sk)) AS BIGINT)
+ FROM (
+ SELECT from_base64_binary('${sk1Base64}') AS sk
+ UNION ALL SELECT from_base64_binary('${sk2Base64}')
+ UNION ALL SELECT NULL
+ ) t
+ """
+
+ // 5) Empty input should return 0
+ sql "DROP TABLE IF EXISTS ${emptyTableName}"
+ sql """
+ CREATE TABLE ${emptyTableName} (
+ id INT,
+ sk STRING
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+ qt_empty_input """SELECT CAST(ROUND(datasketches_hll_union_agg(sk)) AS
BIGINT) FROM ${emptyTableName}"""
+
+ // 6) Illegal input should throw (base64 is valid but bytes are not a
datasketches HLL sketch)
+ test {
+ sql """SELECT datasketches_hll_union_agg(from_base64('AA=='))"""
+ exception "CORRUPTION"
+ }
+ test {
+ sql """SELECT ds_hll_estimate(from_base64('AA=='))"""
+ exception "CORRUPTION"
+ }
+ test {
+ sql """SELECT datasketches_hll_estimate(from_base64('AA=='))"""
+ exception "CORRUPTION"
+ }
+
+ // Empty string is a valid STRING value, but it is an invalid serialized
DataSketches HLL sketch.
+ // It should not fail at INSERT time; it should fail when the aggregate
function reads it.
+ sql "DROP TABLE IF EXISTS ${badTableName}"
+ sql """
+ CREATE TABLE ${badTableName} (
+ id INT,
+ sk STRING
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+ sql """INSERT INTO ${badTableName} VALUES (1, '')"""
+ test {
+ sql """SELECT datasketches_hll_union_agg(sk) FROM ${badTableName}"""
+ exception "CORRUPTION"
+ }
+}
diff --git a/run-be-ut.sh b/run-be-ut.sh
index c9579ead354..1779edf05d7 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -41,7 +41,12 @@ ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null &&
pwd)"
export ROOT
export DORIS_HOME="${ROOT}"
-
+if [[ -z "${DORIS_THIRDPARTY}" ]]; then
+ export DORIS_THIRDPARTY="${DORIS_HOME}/thirdparty"
+fi
+export TP_INCLUDE_DIR="${DORIS_THIRDPARTY}/installed/include"
+export TP_INSTALLED_DIR="${DORIS_THIRDPARTY}/installed"
+export TP_LIB_DIR="${DORIS_THIRDPARTY}/installed/lib"
. "${DORIS_HOME}/env.sh"
# Check args
@@ -174,6 +179,13 @@ update_submodule() {
fi
}
+echo "install datasketches-cpp to thirdparty path before build backend ut"
+update_submodule "contrib/datasketches-cpp" "datasketches-cpp"
"https://github.com/apache/datasketches-cpp/archive/refs/heads/master.tar.gz"
+cd "${DORIS_HOME}/contrib/datasketches-cpp"
+"${CMAKE_CMD}" -S . -B build/Release -DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX=$TP_INSTALLED_DIR -DBUILD_TESTS=OFF
+"${CMAKE_CMD}" --build build/Release -t install
+cd "${DORIS_HOME}"
+
update_submodule "contrib/apache-orc" "apache-orc"
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz"
update_submodule "contrib/clucene" "clucene"
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene.tar.gz"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]