This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 650536d [Feature] Add Topn udaf (#4803)
650536d is described below
commit 650536d53efc637bb297addc6c061890f2a70434
Author: Youngwb <[email protected]>
AuthorDate: Wed Dec 16 21:58:34 2020 +0800
[Feature] Add Topn udaf (#4803)
For #4674
This is a udaf for approximate topn using Space-Saving algorithm. At
present, we can only calculate
the frequent items and their frequencies in a certain column, based on
which we can implement similar
topN functions supported by Kylin in the future.
I have also added a test to calculate the accuracy of this algorithm. The
following is a rough running result.
The total amount of data is 1 million lines and follows the Zipfian
distribution, where Element Cardinality
represents the data cardinality, 20X, 50X.. The value representing
space_expand_rate is 20,50, which is
used to set the counter number in the space-saving algorithm
```
zf exponent = 0.5
Element cardinality 20X 50X 100X
1000 100% 100% 100%
10000 100% 100% 100%
100000 100% 100% 100%
500000 94% 98% 99%
zf exponent = 0.6,1
Element cardinality 20X 50X 100X
1000 100% 100% 100%
10000 100% 100% 100%
100000 100% 100% 100%
500000 100% 100% 100%
```
---
be/src/common/daemon.cpp | 2 +
be/src/exprs/CMakeLists.txt | 3 +-
be/src/exprs/topn_function.cpp | 119 ++++++++++
be/src/exprs/topn_function.h | 47 ++++
be/src/util/CMakeLists.txt | 1 +
be/src/util/topn_counter.cpp | 142 +++++++++++
be/src/util/topn_counter.h | 182 +++++++++++++++
be/test/exprs/CMakeLists.txt | 1 +
be/test/exprs/topn_function_test.cpp | 259 +++++++++++++++++++++
be/test/exprs/zipf_distribution.h | 120 ++++++++++
.../sql-functions/aggregate-functions/topn.md | 61 +++++
.../sql-functions/aggregate-functions/topn.md | 60 +++++
.../apache/doris/analysis/FunctionCallExpr.java | 25 ++
.../java/org/apache/doris/catalog/FunctionSet.java | 84 +++++++
gensrc/proto/olap_common.proto | 10 +
15 files changed, 1115 insertions(+), 1 deletion(-)
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index c05ef99..e57d027 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -41,6 +41,7 @@
#include "exprs/string_functions.h"
#include "exprs/time_operators.h"
#include "exprs/timestamp_functions.h"
+#include "exprs/topn_function.h"
#include "exprs/utility_functions.h"
#include "geo/geo_functions.h"
#include "olap/options.h"
@@ -261,6 +262,7 @@ void Daemon::init(int argc, char** argv, const
std::vector<StorePath>& paths) {
BitmapFunctions::init();
HllFunctions::init();
HashFunctions::init();
+ TopNFunctions::init();
LOG(INFO) << CpuInfo::debug_string();
LOG(INFO) << DiskInfo::debug_string();
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 9194724..bc179e5 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -65,4 +65,5 @@ add_library(Exprs
new_agg_fn_evaluator.cc
bitmap_function.cpp
hll_function.cpp
- grouping_sets_functions.cpp)
+ grouping_sets_functions.cpp
+ topn_function.cpp)
diff --git a/be/src/exprs/topn_function.cpp b/be/src/exprs/topn_function.cpp
new file mode 100644
index 0000000..5ea160f
--- /dev/null
+++ b/be/src/exprs/topn_function.cpp
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/topn_function.h"
+#include "util/topn_counter.h"
+#include "util/slice.h"
+
+namespace doris {
+
+using doris_udf::AnyVal;
+
+void TopNFunctions::init() {
+}
+
+void TopNFunctions::topn_init(FunctionContext* ctx, StringVal *dst) {
+ dst->is_null = false;
+ dst->len = sizeof(TopNCounter);
+ const AnyVal* space_expand_rate_val = ctx->get_constant_arg(2);
+ if (space_expand_rate_val != nullptr) {
+ int32_t space_expand_rate = reinterpret_cast<const
IntVal*>(space_expand_rate_val)->val;
+ dst->ptr = (uint8_t *) new TopNCounter(space_expand_rate);
+ return;
+ }
+ dst->ptr = (uint8_t *) new TopNCounter();
+}
+
+template <typename T>
+void TopNFunctions::topn_update(FunctionContext*, const T& src, const IntVal&
topn, StringVal* dst) {
+ if (src.is_null) {
+ return;
+ }
+ auto* dst_topn = reinterpret_cast<TopNCounter*>(dst->ptr);
+ dst_topn->set_top_num(topn.val);
+ dst_topn->add_item(src);
+}
+
+template <typename T>
+void TopNFunctions::topn_update(FunctionContext *, const T &src, const IntVal
&topn, const IntVal &space_expand_rate,
+ StringVal *dst) {
+ if (src.is_null) {
+ return;
+ }
+ auto* dst_topn = reinterpret_cast<TopNCounter*>(dst->ptr);
+ dst_topn->set_top_num(topn.val);
+ dst_topn->add_item(src);
+}
+
+void TopNFunctions::topn_merge(FunctionContext* ctx, const StringVal &src,
StringVal *dst) {
+ if (src.is_null) {
+ return;
+ }
+ auto* dst_topn = reinterpret_cast<TopNCounter*>(dst->ptr);
+ dst_topn->merge(TopNCounter(Slice(src.ptr, src.len)));
+}
+
+StringVal TopNFunctions::topn_serialize(FunctionContext *ctx, const StringVal
&src) {
+ auto* src_topn = reinterpret_cast<TopNCounter*>(src.ptr);
+
+ std::string buffer;
+ src_topn->serialize(&buffer);
+ StringVal result(ctx, buffer.size());
+ memcpy(result.ptr, buffer.data(), buffer.size());
+ delete src_topn;
+ return result;
+}
+
+StringVal TopNFunctions::topn_finalize(FunctionContext* ctx, const StringVal
&src) {
+ auto* src_topn = reinterpret_cast<TopNCounter*>(src.ptr);
+ std::string result_str;
+ src_topn->finalize(result_str);
+
+ StringVal result(ctx, result_str.size());
+ memcpy(result.ptr, result_str.data(), result_str.size());
+
+ delete src_topn;
+ return result;
+}
+
+template void TopNFunctions::topn_update(FunctionContext*, const BooleanVal&,
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const TinyIntVal&,
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const
SmallIntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const IntVal&,
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const BigIntVal&,
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const FloatVal&,
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const DoubleVal&,
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const StringVal&,
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const DateTimeVal&,
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const LargeIntVal&,
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const DecimalVal&,
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const
DecimalV2Val&, const IntVal&, StringVal*);
+
+template void TopNFunctions::topn_update(FunctionContext*, const BooleanVal&,
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const TinyIntVal&,
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const
SmallIntVal&, const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const IntVal&,
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const BigIntVal&,
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const FloatVal&,
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const DoubleVal&,
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const StringVal&,
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const DateTimeVal&,
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const LargeIntVal&,
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const DecimalVal&,
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const
DecimalV2Val&, const IntVal&, const IntVal&, StringVal*);
+
+}
\ No newline at end of file
diff --git a/be/src/exprs/topn_function.h b/be/src/exprs/topn_function.h
new file mode 100644
index 0000000..b50700e
--- /dev/null
+++ b/be/src/exprs/topn_function.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_EXPRS_TOPN_FUNCTION_H
+#define DORIS_BE_SRC_EXPRS_TOPN_FUNCTION_H
+
+#include "udf/udf.h"
+
+namespace doris {
+
+class TopNFunctions {
+public:
+ static void init();
+
+ static void topn_init(FunctionContext*, StringVal* dst);
+
+ template <typename T>
+ static void topn_update(FunctionContext*, const T& src, const IntVal&
topn, StringVal* dst);
+
+ template <typename T>
+ static void topn_update(FunctionContext*, const T& src, const IntVal&
topn, const IntVal& space_expand_rate,
+ StringVal* dst);
+
+ static void topn_merge(FunctionContext*,const StringVal& src, StringVal*
dst);
+
+ static StringVal topn_serialize(FunctionContext* ctx, const StringVal&
src);
+
+ static StringVal topn_finalize(FunctionContext*, const StringVal& src);
+};
+
+}
+
+#endif //DORIS_BE_SRC_EXPRS_TOPN_FUNCTION_H
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index adb21a6..ce1cd19 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -102,6 +102,7 @@ set(UTIL_FILES
brpc_stub_cache.cpp
zlib.cpp
pprof_utils.cpp
+ topn_counter.cpp
)
if (WITH_MYSQL)
diff --git a/be/src/util/topn_counter.cpp b/be/src/util/topn_counter.cpp
new file mode 100644
index 0000000..bb6052f
--- /dev/null
+++ b/be/src/util/topn_counter.cpp
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <rapidjson/writer.h>
+#include <rapidjson/stringbuffer.h>
+
+#include "gen_cpp/olap_common.pb.h"
+#include "topn_counter.h"
+#include "slice.h"
+
+namespace doris {
+
+void TopNCounter::add_item(const std::string& item, uint64_t incrementCount) {
+ auto iter = _counter_map->find(item);
+ if (iter != _counter_map->end()) {
+ iter->second.add_count(incrementCount);
+ } else {
+ _counter_map->insert(std::make_pair(item, Counter(item,
incrementCount)));
+ }
+ _ordered = false;
+}
+
+void TopNCounter::serialize(std::string* buffer) {
+ sort_retain(_capacity);
+ PTopNCounter topn_counter;
+ topn_counter.set_top_num(_top_num);
+ topn_counter.set_space_expand_rate(_space_expand_rate);
+ for(std::vector<Counter>::const_iterator it = _counter_vec->begin(); it !=
_counter_vec->end(); ++it)
+ {
+ PCounter* counter = topn_counter.add_counter();
+ counter->set_item(it->get_item());
+ counter->set_count(it->get_count());
+ }
+ topn_counter.SerializeToString(buffer);
+}
+
+bool TopNCounter::deserialize(const doris::Slice &src) {
+ PTopNCounter topn_counter;
+ if (!topn_counter.ParseFromArray(src.data, src.size)) {
+ LOG(WARNING) << "topn counter deserialize failed";
+ return false;
+ }
+
+ _space_expand_rate = topn_counter.space_expand_rate();
+ set_top_num(topn_counter.top_num());
+ for (int i = 0; i < topn_counter.counter_size(); ++i) {
+ const PCounter& counter = topn_counter.counter(i);
+ _counter_map->insert(std::make_pair(counter.item(),
Counter(counter.item(), counter.count())));
+ _counter_vec->emplace_back(counter.item(), counter.count());
+ }
+ _ordered = true;
+ return true;
+}
+
+void TopNCounter::sort_retain(uint32_t capacity) {
+ _counter_vec->clear();
+ sort_retain(capacity, _counter_vec);
+ _ordered = true;
+}
+
+void TopNCounter::sort_retain(uint32_t capacity, std::vector<Counter>*
sort_vec) {
+ for(std::unordered_map<std::string, Counter>::const_iterator it =
_counter_map->begin(); it != _counter_map->end(); ++it) {
+ sort_vec->emplace_back(it->second.get_item(), it->second.get_count());
+ }
+
+ std::sort(sort_vec->begin(), sort_vec->end(), TopNComparator());
+ if (sort_vec->size() > capacity) {
+ for (uint32_t i = 0, n = sort_vec->size() - capacity; i < n; ++i) {
+ auto &counter = sort_vec->back();
+ _counter_map->erase(counter.get_item());
+ sort_vec->pop_back();
+ }
+ }
+}
+
+// Based on the parallel version of the Space Saving algorithm as described
in:
+// A parallel space saving algorithm for frequent items and the Hurwitz zeta
distribution by Massimo Cafaro, et al.
+void TopNCounter::merge(doris::TopNCounter &&other) {
+ if (other._counter_map->size() == 0) {
+ return;
+ }
+
+ _space_expand_rate = other._space_expand_rate;
+ set_top_num(other._top_num);
+ bool this_full = _counter_map->size() >= _capacity;
+ bool another_full = other._counter_map->size() >= other._capacity;
+
+ uint64_t m1 = this_full ? _counter_vec->back().get_count() : 0;
+ uint64_t m2 = another_full ? other._counter_vec->back().get_count() : 0;
+
+ if (another_full == true) {
+ for (auto &entry : *(this->_counter_map)) {
+ entry.second.add_count(m2);
+ }
+ }
+
+ for (auto &other_entry : *(other._counter_map)) {
+ auto itr = this->_counter_map->find(other_entry.first);
+ if (itr != _counter_map->end()) {
+ itr->second.add_count(other_entry.second.get_count() - m2);
+ } else {
+ this->_counter_map->insert(std::make_pair(other_entry.first,
+ Counter(other_entry.first,other_entry.second.get_count() +
m1)));
+ }
+ }
+ _ordered = false;
+ sort_retain(_capacity);
+}
+
+void TopNCounter::finalize(std::string& finalize_str) {
+ if (!_ordered) {
+ sort_retain(_top_num);
+ }
+ // use json format print
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ uint32_t k = 0;
+ writer.StartObject();
+ for (std::vector<Counter>::const_iterator it = _counter_vec->begin(); it
!= _counter_vec->end() && k < _top_num; ++it, ++k) {
+ writer.Key(it->get_item().data());
+ writer.Uint64(it->get_count());
+ }
+ writer.EndObject();
+ finalize_str = buffer.GetString();
+}
+
+}
diff --git a/be/src/util/topn_counter.h b/be/src/util/topn_counter.h
new file mode 100644
index 0000000..f8dc584
--- /dev/null
+++ b/be/src/util/topn_counter.h
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_UTI_TOPN_COUNTER_H
+#define DORIS_BE_SRC_UTI_TOPN_COUNTER_H
+
+#include <list>
+#include <unordered_map>
+
+#include "common/logging.h"
+#include "runtime/datetime_value.h"
+#include "runtime/decimal_value.h"
+#include "runtime/decimalv2_value.h"
+#include "runtime/large_int_value.h"
+#include "udf/udf.h"
+
+namespace doris {
+
+static const uint32_t DEFAULT_SPACE_EXPAND_RATE = 50;
+
+class Slice;
+
+class Counter {
+public:
+ Counter() = default;
+
+ Counter(const std::string& item, uint64_t count) : _item(item),
_count(count) {}
+
+ uint64_t get_count() const {
+ return _count;
+ }
+
+ const std::string& get_item() const {
+ return _item;
+ }
+
+ void add_count(uint64_t count) {
+ _count += count;
+ }
+
+ bool operator == (const Counter& other) {
+ if(_item.compare(other._item) != 0) {
+ return false;
+ }
+ if (_count != other._count) {
+ return false;
+ }
+ return true;
+ }
+
+private:
+ std::string _item;
+ uint64_t _count;
+};
+
+
+// Refer to TopNCounter.java in https://github.com/apache/kylin
+// Based on the Space-Saving algorithm and the Stream-Summary data structure
as described in:
+// Efficient Computation of Frequent and Top-k Elements in Data Streams by
Metwally, Agrawal, and Abbadi
+class TopNCounter {
+public:
+ TopNCounter(uint32_t space_expand_rate = DEFAULT_SPACE_EXPAND_RATE) :
+ _top_num(0), _space_expand_rate(space_expand_rate), _capacity(0),
_ordered(false),
+ _counter_map(new std::unordered_map<std::string,
Counter>(_capacity)),
+ _counter_vec(new std::vector<Counter>(_capacity)){}
+
+ TopNCounter(const Slice& src) :
+ _top_num(0), _space_expand_rate(0), _capacity(0), _ordered(false),
+ _counter_map(new std::unordered_map<std::string,
Counter>(_capacity)),
+ _counter_vec(new std::vector<Counter>(_capacity)) {
+ bool res = deserialize(src);
+ DCHECK(res);
+ }
+
+ ~TopNCounter() {
+ delete _counter_map;
+ delete _counter_vec;
+ }
+
+ template <typename T>
+ void add_item(const T& item) {
+ add_item(item, 1);
+ }
+
+ void add_item(const BooleanVal& item, uint64_t incrementCount) {
+ add_item_numeric(item, incrementCount);
+ }
+ void add_item(const TinyIntVal& item, uint64_t incrementCount) {
+ add_item_numeric(item, incrementCount);
+ }
+ void add_item(const SmallIntVal& item, uint64_t incrementCount) {
+ add_item_numeric(item, incrementCount);
+ }
+ void add_item(const IntVal& item, uint64_t incrementCount) {
+ add_item_numeric(item, incrementCount);
+ }
+ void add_item(const BigIntVal& item, uint64_t incrementCount) {
+ add_item_numeric(item, incrementCount);
+ }
+ void add_item(const FloatVal& item, uint64_t incrementCount) {
+ add_item_numeric(item, incrementCount);
+ }
+ void add_item(const DoubleVal& item, uint64_t incrementCount) {
+ add_item_numeric(item, incrementCount);
+ }
+ void add_item(const StringVal& item, uint64_t incrementCount) {
+ add_item(std::string((char*) item.ptr, item.len), incrementCount);
+ }
+ void add_item(const DateTimeVal& item, uint64_t incrementCount) {
+ char str[MAX_DTVALUE_STR_LEN];
+ DateTimeValue::from_datetime_val(item).to_string(str);
+ add_item(std::string(str), incrementCount);
+ }
+ void add_item(const LargeIntVal& item, uint64_t incrementCount) {
+ add_item(LargeIntValue::to_string(item.val), incrementCount);
+ }
+ void add_item(const DecimalVal& item, uint64_t incrementCount) {
+ add_item(DecimalValue::from_decimal_val(item).to_string(),
incrementCount);
+ }
+ void add_item(const DecimalV2Val& item, uint64_t incrementCount) {
+ add_item(DecimalV2Value::from_decimal_val(item).to_string(),
incrementCount);
+ }
+
+ template <typename T>
+ void add_item_numeric(const T& item, uint64_t incrementCount) {
+ add_item(std::to_string(item.val), incrementCount);
+ }
+
+ void add_item(const std::string& item, uint64_t incrementCount);
+
+ void serialize(std::string* buffer);
+
+ bool deserialize(const Slice& src);
+
+ void merge(doris::TopNCounter&& other);
+
+ // Sort counter by count value and record it in _counter_vec
+ void sort_retain(uint32_t capacity);
+
+ void sort_retain(uint32_t capacity, std::vector<Counter>* sort_vec);
+
+ void finalize(std::string&);
+
+ void set_top_num(uint32_t top_num) {
+ _top_num = top_num;
+ _capacity = top_num * _space_expand_rate;
+ }
+
+private:
+ uint32_t _top_num;
+ uint32_t _space_expand_rate;
+ uint64_t _capacity;
+ bool _ordered;
+ std::unordered_map<std::string, Counter>* _counter_map;
+ std::vector<Counter>* _counter_vec;
+};
+
+class TopNComparator
+{
+public:
+ bool operator () (const Counter& s1, const Counter& s2)
+ {
+ return s1.get_count() > s2.get_count();
+ }
+};
+}
+
+#endif //DORIS_BE_SRC_UTI_TOPN_COUNTER_H
diff --git a/be/test/exprs/CMakeLists.txt b/be/test/exprs/CMakeLists.txt
index fef8172..8c393aa 100644
--- a/be/test/exprs/CMakeLists.txt
+++ b/be/test/exprs/CMakeLists.txt
@@ -34,4 +34,5 @@ ADD_BE_TEST(hll_function_test)
ADD_BE_TEST(encryption_functions_test)
#ADD_BE_TEST(in-predicate-test)
ADD_BE_TEST(math_functions_test)
+ADD_BE_TEST(topn_function_test)
diff --git a/be/test/exprs/topn_function_test.cpp
b/be/test/exprs/topn_function_test.cpp
new file mode 100644
index 0000000..f13c2e2
--- /dev/null
+++ b/be/test/exprs/topn_function_test.cpp
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/anyval_util.h"
+#include "exprs/topn_function.h"
+#include "util/topn_counter.h"
+#include "testutil/function_utils.h"
+#include "zipf_distribution.h"
+
+#include <gtest/gtest.h>
+#include <unordered_map>
+
+
+namespace doris {
+
+static const uint32_t TOPN_NUM = 100;
+static const uint32_t TOTAL_RECORDS = 1000000;
+static const uint32_t PARALLEL = 10;
+
+std::string gen_random(const int len) {
+ std::string possible_characters =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+ std::random_device rd;
+ std::mt19937 generator(rd());
+ std::uniform_int_distribution<> dist(0, possible_characters.size()-1);
+
+ std::string rand_str(len, '\0');
+ for(auto& dis: rand_str) {
+ dis = possible_characters[dist(generator)];
+ }
+ return rand_str;
+}
+
+class TopNFunctionsTest : public testing::Test {
+public:
+ TopNFunctionsTest() = default;
+
+ void SetUp() {
+ utils = new FunctionUtils();
+ ctx = utils->get_fn_ctx();
+ }
+
+ void TearDown() {
+ delete utils;
+ }
+
+private:
+ FunctionUtils *utils;
+ FunctionContext *ctx;
+};
+
+void update_accuracy_map(const std::string& item,
std::unordered_map<std::string, uint32_t>& accuracy_map) {
+ if (accuracy_map.find(item) != accuracy_map.end()) {
+ ++accuracy_map[item];
+ } else {
+ accuracy_map.insert(std::make_pair(item, 1));
+ }
+}
+
+void topn_single(FunctionContext* ctx, std::string& random_str, StringVal&
dst, std::unordered_map<std::string, uint32_t>& accuracy_map){
+ TopNFunctions::topn_update(ctx, StringVal(((uint8_t*) random_str.data()),
random_str.length()), TOPN_NUM, &dst);
+ update_accuracy_map(random_str, accuracy_map);
+}
+
+void test_topn_accuracy(FunctionContext* ctx, int key_space, int
space_expand_rate, double zipf_distribution_exponent) {
+ LOG(INFO) << "topn accuracy : " << "key space : " << key_space << " ,
space_expand_rate : " << space_expand_rate <<
+ " , zf exponent : " << zipf_distribution_exponent;
+ std::unordered_map<std::string, uint32_t> accuracy_map;
+ // prepare random data
+ std::vector<std::string> random_strs(key_space);
+ for (uint32_t i = 0; i < key_space; ++i) {
+ random_strs[i] = gen_random(10);
+ }
+
+ zipf_distribution<uint64_t, double> zf(key_space,
zipf_distribution_exponent);
+ std::random_device rd;
+ std::mt19937 gen(rd());
+
+ StringVal topn_column("placeholder");
+ IntVal topn_num_column(TOPN_NUM);
+ IntVal space_expand_rate_column(space_expand_rate);
+ std::vector<doris_udf::AnyVal*> const_vals;
+ const_vals.push_back(&topn_column);
+ const_vals.push_back(&topn_num_column);
+ const_vals.push_back(&space_expand_rate_column);
+ ctx->impl()->set_constant_args(const_vals);
+ // Compute topN in parallel
+ StringVal dst;
+ TopNFunctions::topn_init(ctx, &dst);
+
+ StringVal single_dst_str[PARALLEL];
+ for (uint32_t i = 0; i < PARALLEL; ++i) {
+ TopNFunctions::topn_init(ctx, &single_dst_str[i]);
+ }
+
+ std::random_device random_rd;
+ std::mt19937 random_gen(random_rd());
+ std::uniform_int_distribution<> dist(0, PARALLEL-1);
+ for (uint32_t i = 0; i < TOTAL_RECORDS; ++i) {
+ // generate zipf_distribution
+ uint32_t index = zf(gen);
+ // choose one single topn to update
+ topn_single(ctx, random_strs[index], single_dst_str[dist(random_gen)],
accuracy_map);
+ }
+
+ for (uint32_t i = 0; i < PARALLEL; ++i) {
+ StringVal serialized_str = TopNFunctions::topn_serialize(ctx,
single_dst_str[i]);
+ TopNFunctions::topn_merge(ctx, serialized_str, &dst);
+ }
+
+ // get accuracy result
+ std::vector<Counter> accuracy_sort_vec;
+ for(std::unordered_map<std::string, uint32_t >::const_iterator it =
accuracy_map.begin(); it != accuracy_map.end(); ++it) {
+ accuracy_sort_vec.emplace_back(it->first, it->second);
+ }
+ std::sort(accuracy_sort_vec.begin(), accuracy_sort_vec.end(),
TopNComparator());
+
+ // get topn result
+ TopNCounter* topn_dst = reinterpret_cast<TopNCounter*>(dst.ptr);
+ std::vector<Counter> topn_sort_vec;
+ topn_dst->sort_retain(TOPN_NUM, &topn_sort_vec);
+
+ uint32_t error = 0;
+ for (uint32_t i = 0; i < TOPN_NUM; ++i) {
+ Counter& accuracy_counter = accuracy_sort_vec[i];
+ Counter& topn_counter = topn_sort_vec[i];
+ if (accuracy_counter.get_count() != topn_counter.get_count()) {
+ ++error;
+ LOG(INFO) << "Failed";
+ LOG(INFO) << "accuracy counter : (" << accuracy_counter.get_item()
<< ", " << accuracy_counter.get_count() << ")";
+ LOG(INFO) << "topn counter : (" << topn_counter.get_item() << ", "
<< topn_counter.get_count() << ")";
+ }
+ }
+ LOG(INFO) << "Total errors : " << error;
+ TopNFunctions::topn_finalize(ctx, dst);
+}
+
+TEST_F(TopNFunctionsTest, topn_accuracy) {
+ std::vector<int> key_space_vec ({1000, 10000, 100000, 500000});
+ std::vector<int> space_expand_rate_vec({20, 50, 100});
+ std::vector<double> zipf_distribution_exponent_vec({0.5, 0.6, 1.0});
+ for (auto ket_space : key_space_vec) {
+ for (auto space_expand_rate : space_expand_rate_vec) {
+ for (auto zipf_distribution_exponent :
zipf_distribution_exponent_vec) {
+ test_topn_accuracy(ctx, ket_space, space_expand_rate,
zipf_distribution_exponent);
+ }
+ }
+ }
+
+}
+
+TEST_F(TopNFunctionsTest, topn_update) {
+ StringVal dst;
+ TopNFunctions::topn_init(ctx, &dst);
+ StringVal src1("a");
+ for (uint32_t i = 0; i < 10; ++i) {
+ TopNFunctions::topn_update(ctx, src1, 2, &dst);
+ }
+
+ StringVal src2("b");
+ TopNFunctions::topn_update(ctx, src2, 2, &dst);
+ TopNFunctions::topn_update(ctx, src2, 2, &dst);
+
+ StringVal src3("c");
+ TopNFunctions::topn_update(ctx, src3, 2, &dst);
+
+ StringVal result = TopNFunctions::topn_finalize(ctx, dst);
+ StringVal expected("{\"a\":10,\"b\":2}");
+ ASSERT_EQ(expected, result);
+}
+
+TEST_F(TopNFunctionsTest, topn_merge) {
+ StringVal dst1;
+ TopNFunctions::topn_init(ctx, &dst1);
+ StringVal dst2;
+ TopNFunctions::topn_init(ctx, &dst2);
+
+ StringVal src1("a");
+ for (uint32_t i = 0; i < 10; ++i) {
+ TopNFunctions::topn_update(ctx, src1, 2, &dst1);
+ TopNFunctions::topn_update(ctx, src1, 2, &dst2);
+ }
+ StringVal src2("b");
+ for (uint32_t i = 0; i < 8; ++i) {
+ TopNFunctions::topn_update(ctx, src2, 2, &dst1);
+ }
+ StringVal src3("c");
+ for (uint32_t i = 0; i < 6; ++i) {
+ TopNFunctions::topn_update(ctx, src3, 2, &dst2);
+ }
+
+ StringVal val1 = TopNFunctions::topn_serialize(ctx, dst1);
+ StringVal val2 = TopNFunctions::topn_serialize(ctx, dst2);
+
+ StringVal dst;
+ TopNFunctions::topn_init(ctx, &dst);
+ TopNFunctions::topn_merge(ctx, val1, &dst);
+ TopNFunctions::topn_merge(ctx, val2, &dst);
+ StringVal result = TopNFunctions::topn_finalize(ctx, dst);
+ StringVal expected("{\"a\":20,\"b\":8}");
+ ASSERT_EQ(expected, result);
+}
+
+TEST_F(TopNFunctionsTest, test_null_value) {
+ StringVal dst1;
+ TopNFunctions::topn_init(ctx, &dst1);
+
+ for (uint32_t i = 0; i < 10; ++i) {
+ TopNFunctions::topn_update(ctx, IntVal::null(), 2, &dst1);
+ }
+ StringVal serialized = TopNFunctions::topn_serialize(ctx, dst1);
+
+ StringVal dst2;
+ TopNFunctions::topn_init(ctx, &dst2);
+ TopNFunctions::topn_merge(ctx, serialized, &dst2);
+ StringVal result = TopNFunctions::topn_finalize(ctx, dst2);
+ StringVal expected("{}");
+ ASSERT_EQ(expected, result);
+}
+
+TEST_F(TopNFunctionsTest, test_date_type) {
+ StringVal dst1;
+ TopNFunctions::topn_init(ctx, &dst1);
+
+ DateTimeValue dt(20201001000000);
+ doris_udf::DateTimeVal dt_val;
+ dt.to_datetime_val(&dt_val);
+ for (uint32_t i = 0; i < 10; ++i) {
+ TopNFunctions::topn_update(ctx, dt_val, 1, &dst1);
+ }
+ StringVal serialized = TopNFunctions::topn_serialize(ctx, dst1);
+
+ StringVal dst2;
+ TopNFunctions::topn_init(ctx, &dst2);
+ TopNFunctions::topn_merge(ctx, serialized, &dst2);
+ StringVal result = TopNFunctions::topn_finalize(ctx, dst2);
+ StringVal expected("{\"2020-10-01 00:00:00\":10}");
+ ASSERT_EQ(expected, result);
+}
+
+}
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/be/test/exprs/zipf_distribution.h
b/be/test/exprs/zipf_distribution.h
new file mode 100644
index 0000000..8ae2fda
--- /dev/null
+++ b/be/test/exprs/zipf_distribution.h
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <random>
+
+/** Refer to
https://stackoverflow.com/questions/9983239/how-to-generate-zipf-distributed-numbers-efficiently
+ * Zipf-like random distribution.
+ *
+ * "Rejection-inversion to generate variates from monotone discrete
+ * distributions", Wolfgang Hörmann and Gerhard Derflinger
+ * ACM TOMACS 6.3 (1996): 169-184
+ */
+template<class IntType = unsigned long, class RealType = double>
+class zipf_distribution
+{
+public:
+ typedef RealType input_type;
+ typedef IntType result_type;
+
+ static_assert(std::numeric_limits<IntType>::is_integer, "");
+ static_assert(!std::numeric_limits<RealType>::is_integer, "");
+
+ zipf_distribution(const IntType n=std::numeric_limits<IntType>::max(),
+ const RealType q=1.0)
+ : n(n)
+ , q(q)
+ , H_x1(H(1.5) - 1.0)
+ , H_n(H(n + 0.5))
+ , dist(H_x1, H_n)
+ {}
+
+ IntType operator()(std::mt19937& rng)
+ {
+ while (true) {
+ const RealType u = dist(rng);
+ const RealType x = H_inv(u);
+ const IntType k = clamp<IntType>(std::round(x), 1, n);
+ if (u >= H(k + 0.5) - h(k)) {
+ return k;
+ }
+ }
+ }
+
+private:
+ /** Clamp x to [min, max]. */
+ template<typename T>
+ static constexpr T clamp(const T x, const T min, const T max)
+ {
+ return std::max(min, std::min(max, x));
+ }
+
+ /** exp(x) - 1 / x */
+ static double
+ expxm1bx(const double x)
+ {
+ return (std::abs(x) > epsilon)
+ ? std::expm1(x) / x
+ : (1.0 + x/2.0 * (1.0 + x/3.0 * (1.0 + x/4.0)));
+ }
+
+ /** H(x) = log(x) if q == 1, (x^(1-q) - 1)/(1 - q) otherwise.
+ * H(x) is an integral of h(x).
+ *
+ * Note the numerator is one less than in the paper order to work with all
+ * positive q.
+ */
+ const RealType H(const RealType x)
+ {
+ const RealType log_x = std::log(x);
+ return expxm1bx((1.0 - q) * log_x) * log_x;
+ }
+
+ /** log(1 + x) / x */
+ static RealType
+ log1pxbx(const RealType x)
+ {
+ return (std::abs(x) > epsilon)
+ ? std::log1p(x) / x
+ : 1.0 - x * ((1/2.0) - x * ((1/3.0) - x * (1/4.0)));
+ }
+
+ /** The inverse function of H(x) */
+ const RealType H_inv(const RealType x)
+ {
+ const RealType t = std::max(-1.0, x * (1.0 - q));
+ return std::exp(log1pxbx(t) * x);
+ }
+
+ /** That hat function h(x) = 1 / (x ^ q) */
+ const RealType h(const RealType x)
+ {
+ return std::exp(-q * std::log(x));
+ }
+
+ static constexpr RealType epsilon = 1e-8;
+
+ IntType n; ///< Number of elements
+ RealType q; ///< Exponent
+ RealType H_x1; ///< H(x_1)
+ RealType H_n; ///< H(n)
+ std::uniform_real_distribution<RealType> dist; ///< [H(x_1), H(n)]
+};
\ No newline at end of file
diff --git a/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md
b/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md
new file mode 100644
index 0000000..4396728
--- /dev/null
+++ b/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md
@@ -0,0 +1,61 @@
+---
+{
+ "title": "TOPN",
+ "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# TOPN
+## description
+### Syntax
+
+`topn(expr, INT top_num[, INT space_expand_rate])`
+
+The topn function uses the Space-Saving algorithm to calculate the top_num
frequent items in expr, and the result is the
+frequent items and their occurrence times, which is an approximation
+
+The space_expand_rate parameter is optional and is used to set the number of
counters used in the Space-Saving algorithm
+```
+counter numbers = top_num * space_expand_rate
+```
+The higher value of space_expand_rate, the more accurate result will be. The
default value is 50
+
+## example
+```
+MySQL [test]> select topn(keyword,10) from keyword_table where date>=
'2020-06-01' and date <= '2020-06-19' ;
++------------------------------------------------------------------------------------------------------------+
+| topn(`keyword`, 10)
|
++------------------------------------------------------------------------------------------------------------+
+| a:157, b:138, c:133, d:133, e:131, f:127, g:124, h:122, i:117, k:117
|
++------------------------------------------------------------------------------------------------------------+
+
+MySQL [test]> select date,topn(keyword,10,100) from keyword_table where date>=
'2020-06-17' and date <= '2020-06-19' group by date;
++------------+-----------------------------------------------------------------------------------------------+
+| date | topn(`keyword`, 10, 100)
|
++------------+-----------------------------------------------------------------------------------------------+
+| 2020-06-19 | a:11, b:8, c:8, d:7, e:7, f:7, g:7, h:7, i:7, j:7
|
+| 2020-06-18 | a:10, b:8, c:7, f:7, g:7, i:7, k:7, l:7, m:6, d:6
|
+| 2020-06-17 | a:9, b:8, c:8, j:8, d:7, e:7, f:7, h:7, i:7, k:7
|
++------------+-----------------------------------------------------------------------------------------------+
+```
+## keyword
+TOPN
\ No newline at end of file
diff --git a/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md
b/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md
new file mode 100644
index 0000000..11c4544
--- /dev/null
+++ b/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md
@@ -0,0 +1,60 @@
+---
+{
+ "title": "TOPN",
+ "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# TOPN
+## description
+### Syntax
+
+`topn(expr, INT top_num[, INT space_expand_rate])`
+
+该topn函数使用Space-Saving算法计算expr中的top_num个频繁项,结果为频繁项及其出现次数,该结果为近似值
+
+space_expand_rate参数是可选项,该值用来设置Space-Saving算法中使用的counter个数
+```
+counter numbers = top_num * space_expand_rate
+```
+space_expand_rate的值越大,结果越准确,默认值为50
+
+## example
+```
+MySQL [test]> select topn(keyword,10) from keyword_table where date>=
'2020-06-01' and date <= '2020-06-19' ;
++------------------------------------------------------------------------------------------------------------+
+| topn(`keyword`, 10)
|
++------------------------------------------------------------------------------------------------------------+
+| a:157, b:138, c:133, d:133, e:131, f:127, g:124, h:122, i:117, k:117
|
++------------------------------------------------------------------------------------------------------------+
+
+MySQL [test]> select date,topn(keyword,10,100) from keyword_table where date>=
'2020-06-17' and date <= '2020-06-19' group by date;
++------------+-----------------------------------------------------------------------------------------------+
+| date | topn(`keyword`, 10, 100)
|
++------------+-----------------------------------------------------------------------------------------------+
+| 2020-06-19 | a:11, b:8, c:8, d:7, e:7, f:7, g:7, h:7, i:7, j:7
|
+| 2020-06-18 | a:10, b:8, c:7, f:7, g:7, i:7, k:7, l:7, m:6, d:6
|
+| 2020-06-17 | a:9, b:8, c:8, j:8, d:7, e:7, f:7, h:7, i:7, k:7
|
++------------+-----------------------------------------------------------------------------------------------+
+```
+## keyword
+TOPN
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
index 0b0ef1b..9f41e69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.ScalarFunction;
+import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
@@ -455,6 +456,30 @@ public class FunctionCallExpr extends Expr {
}
}
}
+
+ if (fnName.getFunction().equalsIgnoreCase("topn")) {
+ if (children.size() != 2 && children.size() != 3) {
+ throw new AnalysisException("topn(expr, INT [, B]) requires
two or three parameters");
+ }
+ if (!getChild(1).isConstant() ||
!getChild(1).getType().isIntegerType()) {
+ throw new AnalysisException("topn requires second parameter
must be a constant Integer Type: "
+ + this.toSql());
+ }
+ if (getChild(1).getType() != ScalarType.INT) {
+ Expr e = getChild(1).castTo(ScalarType.INT);
+ setChild(1, e);
+ }
+ if (children.size() == 3) {
+ if (!getChild(2).isConstant() ||
!getChild(2).getType().isIntegerType()) {
+ throw new AnalysisException("topn requires the third
parameter must be a constant Integer Type: "
+ + this.toSql());
+ }
+ if (getChild(2).getType() != ScalarType.INT) {
+ Expr e = getChild(2).castTo(ScalarType.INT);
+ setChild(2, e);
+ }
+ }
+ }
}
// Provide better error message for some aggregate builtins. These can be
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
index 579e19c..10ecbdf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
@@ -817,6 +817,70 @@ public class FunctionSet {
"_ZN5doris15BitmapFunctions25bitmap_intersect_finalizeINS_11StringValueEEEN9doris_udf9BigIntValEPNS3_15FunctionContextERKNS3_9StringValE")
.build();
+ private static final Map<Type, String> TOPN_UPDATE_SYMBOL =
+ ImmutableMap.<Type, String>builder()
+ .put(Type.BOOLEAN,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10BooleanValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+ .put(Type.TINYINT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+ .put(Type.SMALLINT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11SmallIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+ .put(Type.INT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf6IntValEEEvPNS2_15FunctionContextERKT_RKS3_PNS2_9StringValE")
+ .put(Type.BIGINT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9BigIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+ .put(Type.FLOAT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf8FloatValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+ .put(Type.DOUBLE,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9DoubleValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+ .put(Type.CHAR,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPS3_")
+ .put(Type.VARCHAR,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPS3_")
+ .put(Type.DATE,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+ .put(Type.DATETIME,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+ .put(Type.DECIMAL,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10DecimalValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+ .put(Type.DECIMALV2,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf12DecimalV2ValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+ .put(Type.LARGEINT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11LargeIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+ .build();
+
+ private static final Map<Type, String> TOPN_UPDATE_MORE_PARAM_SYMBOL =
+ ImmutableMap.<Type, String>builder()
+ .put(Type.BOOLEAN,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10BooleanValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+ .put(Type.TINYINT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+ .put(Type.SMALLINT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11SmallIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+ .put(Type.INT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf6IntValEEEvPNS2_15FunctionContextERKT_RKS3_SA_PNS2_9StringValE")
+ .put(Type.BIGINT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9BigIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+ .put(Type.FLOAT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf8FloatValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+ .put(Type.DOUBLE,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9DoubleValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+ .put(Type.CHAR,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PS3_")
+ .put(Type.VARCHAR,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PS3_")
+ .put(Type.DATE,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+ .put(Type.DATETIME,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+ .put(Type.DECIMAL,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10DecimalValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+ .put(Type.DECIMALV2,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf12DecimalV2ValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+ .put(Type.LARGEINT,
+
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11LargeIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+ .build();
+
public Function getFunction(Function desc, Function.CompareMode mode) {
List<Function> fns = functions.get(desc.functionName());
if (fns == null) {
@@ -1185,6 +1249,26 @@ public class FunctionSet {
"_ZN5doris12HllFunctions13hll_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
true, false, true));
+ // TopN
+ if (TOPN_UPDATE_SYMBOL.containsKey(t)) {
+ addBuiltin(AggregateFunction.createBuiltin("topn",
+ Lists.newArrayList(t, Type.INT), Type.VARCHAR,
Type.VARCHAR,
+
"_ZN5doris13TopNFunctions9topn_initEPN9doris_udf15FunctionContextEPNS1_9StringValE",
+ TOPN_UPDATE_SYMBOL.get(t),
+
"_ZN5doris13TopNFunctions10topn_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_",
+
"_ZN5doris13TopNFunctions14topn_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
+
"_ZN5doris13TopNFunctions13topn_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
+ true, false, true));
+ addBuiltin(AggregateFunction.createBuiltin("topn",
+ Lists.newArrayList(t, Type.INT, Type.INT),
Type.VARCHAR, Type.VARCHAR,
+
"_ZN5doris13TopNFunctions9topn_initEPN9doris_udf15FunctionContextEPNS1_9StringValE",
+ TOPN_UPDATE_MORE_PARAM_SYMBOL.get(t),
+
"_ZN5doris13TopNFunctions10topn_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_",
+
"_ZN5doris13TopNFunctions14topn_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
+
"_ZN5doris13TopNFunctions13topn_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
+ true, false, true));
+ }
+
if (STDDEV_UPDATE_SYMBOL.containsKey(t)) {
addBuiltin(AggregateFunction.createBuiltin("stddev",
Lists.newArrayList(t), STDDEV_RETTYPE_SYMBOL.get(t),
Type.VARCHAR,
diff --git a/gensrc/proto/olap_common.proto b/gensrc/proto/olap_common.proto
index 846b25a..7326b88 100644
--- a/gensrc/proto/olap_common.proto
+++ b/gensrc/proto/olap_common.proto
@@ -48,3 +48,13 @@ enum CompressKind {
COMPRESS_LZ4 = 2;
}
+message PCounter {
+ required string item = 1;
+ required uint64 count = 2;
+}
+
+message PTopNCounter {
+ required uint32 top_num = 1;
+ required uint32 space_expand_rate = 2;
+ repeated PCounter counter = 3;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]