This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 650536d  [Feature] Add Topn udaf (#4803)
650536d is described below

commit 650536d53efc637bb297addc6c061890f2a70434
Author: Youngwb <[email protected]>
AuthorDate: Wed Dec 16 21:58:34 2020 +0800

    [Feature] Add Topn udaf (#4803)
    
    For #4674
    This is a udaf for approximate topn using Space-Saving algorithm.  At 
present, we can only calculate
    the frequent items and their frequencies in a certain column, based on 
which we can implement similar
    topN functions supported by Kylin in the future.
    
    I have also added a test to calculate the accuracy of this algorithm. The 
following is a rough running result.
    The total amount of data is 1 million lines and follows the Zipfian 
distribution, where Element Cardinality
    represents the data cardinality, 20X, 50X.. The value representing 
space_expand_rate is 20,50, which is
    used to set the counter number in the space-saving algorithm
    
    ```
    zf exponent = 0.5
    Element cardinality         20X        50X          100X
                   1000         100%       100%         100%
                   10000                100%       100%         100%
               100000           100%       100%         100%
               500000            94%        98%          99%
    
    zf exponent = 0.6,1
    Element cardinality         20X        50X          100X
                1000            100%       100%         100%
                10000           100%       100%         100%
                100000          100%       100%         100%
                500000          100%       100%         100%
    
    ```
---
 be/src/common/daemon.cpp                           |   2 +
 be/src/exprs/CMakeLists.txt                        |   3 +-
 be/src/exprs/topn_function.cpp                     | 119 ++++++++++
 be/src/exprs/topn_function.h                       |  47 ++++
 be/src/util/CMakeLists.txt                         |   1 +
 be/src/util/topn_counter.cpp                       | 142 +++++++++++
 be/src/util/topn_counter.h                         | 182 +++++++++++++++
 be/test/exprs/CMakeLists.txt                       |   1 +
 be/test/exprs/topn_function_test.cpp               | 259 +++++++++++++++++++++
 be/test/exprs/zipf_distribution.h                  | 120 ++++++++++
 .../sql-functions/aggregate-functions/topn.md      |  61 +++++
 .../sql-functions/aggregate-functions/topn.md      |  60 +++++
 .../apache/doris/analysis/FunctionCallExpr.java    |  25 ++
 .../java/org/apache/doris/catalog/FunctionSet.java |  84 +++++++
 gensrc/proto/olap_common.proto                     |  10 +
 15 files changed, 1115 insertions(+), 1 deletion(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index c05ef99..e57d027 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -41,6 +41,7 @@
 #include "exprs/string_functions.h"
 #include "exprs/time_operators.h"
 #include "exprs/timestamp_functions.h"
+#include "exprs/topn_function.h"
 #include "exprs/utility_functions.h"
 #include "geo/geo_functions.h"
 #include "olap/options.h"
@@ -261,6 +262,7 @@ void Daemon::init(int argc, char** argv, const 
std::vector<StorePath>& paths) {
     BitmapFunctions::init();
     HllFunctions::init();
     HashFunctions::init();
+    TopNFunctions::init();
 
     LOG(INFO) << CpuInfo::debug_string();
     LOG(INFO) << DiskInfo::debug_string();
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 9194724..bc179e5 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -65,4 +65,5 @@ add_library(Exprs
   new_agg_fn_evaluator.cc
   bitmap_function.cpp
   hll_function.cpp
-  grouping_sets_functions.cpp)
+  grouping_sets_functions.cpp
+  topn_function.cpp)
diff --git a/be/src/exprs/topn_function.cpp b/be/src/exprs/topn_function.cpp
new file mode 100644
index 0000000..5ea160f
--- /dev/null
+++ b/be/src/exprs/topn_function.cpp
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/topn_function.h"
+#include "util/topn_counter.h"
+#include "util/slice.h"
+
+namespace doris {
+
+using doris_udf::AnyVal;
+
+void TopNFunctions::init() {
+}
+
+void TopNFunctions::topn_init(FunctionContext* ctx, StringVal *dst) {
+    dst->is_null = false;
+    dst->len = sizeof(TopNCounter);
+    const AnyVal* space_expand_rate_val = ctx->get_constant_arg(2);
+    if (space_expand_rate_val != nullptr) {
+        int32_t space_expand_rate = reinterpret_cast<const 
IntVal*>(space_expand_rate_val)->val;
+        dst->ptr = (uint8_t *) new TopNCounter(space_expand_rate);
+        return;
+    }
+    dst->ptr = (uint8_t *) new TopNCounter();
+}
+
+template <typename T>
+void TopNFunctions::topn_update(FunctionContext*, const T& src, const IntVal& 
topn, StringVal* dst) {
+    if (src.is_null) {
+        return;
+    }
+    auto* dst_topn = reinterpret_cast<TopNCounter*>(dst->ptr);
+    dst_topn->set_top_num(topn.val);
+    dst_topn->add_item(src);
+}
+
+template <typename T>
+void TopNFunctions::topn_update(FunctionContext *, const T &src, const IntVal 
&topn, const IntVal &space_expand_rate,
+                                StringVal *dst) {
+    if (src.is_null) {
+        return;
+    }
+    auto* dst_topn = reinterpret_cast<TopNCounter*>(dst->ptr);
+    dst_topn->set_top_num(topn.val);
+    dst_topn->add_item(src);
+}
+
+void TopNFunctions::topn_merge(FunctionContext* ctx, const StringVal &src, 
StringVal *dst) {
+    if (src.is_null) {
+        return;
+    }
+    auto* dst_topn = reinterpret_cast<TopNCounter*>(dst->ptr);
+    dst_topn->merge(TopNCounter(Slice(src.ptr, src.len)));
+}
+
+StringVal TopNFunctions::topn_serialize(FunctionContext *ctx, const StringVal 
&src) {
+    auto* src_topn = reinterpret_cast<TopNCounter*>(src.ptr);
+
+    std::string buffer;
+    src_topn->serialize(&buffer);
+    StringVal result(ctx, buffer.size());
+    memcpy(result.ptr, buffer.data(), buffer.size());
+    delete src_topn;
+    return result;
+}
+
+StringVal TopNFunctions::topn_finalize(FunctionContext* ctx, const StringVal 
&src) {
+    auto* src_topn = reinterpret_cast<TopNCounter*>(src.ptr);
+    std::string result_str;
+    src_topn->finalize(result_str);
+
+    StringVal result(ctx, result_str.size());
+    memcpy(result.ptr, result_str.data(), result_str.size());
+
+    delete src_topn;
+    return result;
+}
+
+template void TopNFunctions::topn_update(FunctionContext*, const BooleanVal&, 
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const TinyIntVal&, 
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const 
SmallIntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const IntVal&, 
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const BigIntVal&, 
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const FloatVal&, 
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const DoubleVal&, 
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const StringVal&, 
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const DateTimeVal&, 
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const LargeIntVal&, 
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const DecimalVal&, 
const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const 
DecimalV2Val&, const IntVal&, StringVal*);
+
+template void TopNFunctions::topn_update(FunctionContext*, const BooleanVal&, 
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const TinyIntVal&, 
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const 
SmallIntVal&, const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const IntVal&, 
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const BigIntVal&, 
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const FloatVal&, 
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const DoubleVal&, 
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext *, const StringVal&, 
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const DateTimeVal&, 
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const LargeIntVal&, 
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const DecimalVal&, 
const IntVal&, const IntVal&, StringVal*);
+template void TopNFunctions::topn_update(FunctionContext*, const 
DecimalV2Val&, const IntVal&, const IntVal&, StringVal*);
+
+}
\ No newline at end of file
diff --git a/be/src/exprs/topn_function.h b/be/src/exprs/topn_function.h
new file mode 100644
index 0000000..b50700e
--- /dev/null
+++ b/be/src/exprs/topn_function.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_EXPRS_TOPN_FUNCTION_H
+#define DORIS_BE_SRC_EXPRS_TOPN_FUNCTION_H
+
+#include "udf/udf.h"
+
+namespace doris {
+
+class TopNFunctions {
+public:
+    static void init();
+
+    static void topn_init(FunctionContext*, StringVal* dst);
+
+    template <typename T>
+    static void topn_update(FunctionContext*, const T& src, const IntVal& 
topn, StringVal* dst);
+
+    template <typename T>
+    static void topn_update(FunctionContext*, const T& src, const IntVal& 
topn, const IntVal& space_expand_rate,
+            StringVal* dst);
+
+    static void topn_merge(FunctionContext*,const StringVal& src, StringVal* 
dst);
+
+    static StringVal topn_serialize(FunctionContext* ctx, const StringVal& 
src);
+
+    static StringVal topn_finalize(FunctionContext*, const StringVal& src);
+};
+
+}
+
+#endif //DORIS_BE_SRC_EXPRS_TOPN_FUNCTION_H
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index adb21a6..ce1cd19 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -102,6 +102,7 @@ set(UTIL_FILES
   brpc_stub_cache.cpp
   zlib.cpp
   pprof_utils.cpp
+  topn_counter.cpp
 )
 
 if (WITH_MYSQL)
diff --git a/be/src/util/topn_counter.cpp b/be/src/util/topn_counter.cpp
new file mode 100644
index 0000000..bb6052f
--- /dev/null
+++ b/be/src/util/topn_counter.cpp
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <rapidjson/writer.h>
+#include <rapidjson/stringbuffer.h>
+
+#include "gen_cpp/olap_common.pb.h"
+#include "topn_counter.h"
+#include "slice.h"
+
+namespace doris {
+
+void TopNCounter::add_item(const std::string& item, uint64_t incrementCount) {
+    auto iter = _counter_map->find(item);
+    if (iter != _counter_map->end()) {
+        iter->second.add_count(incrementCount);
+    } else {
+        _counter_map->insert(std::make_pair(item, Counter(item, 
incrementCount)));
+    }
+    _ordered = false;
+}
+
+void TopNCounter::serialize(std::string* buffer) {
+    sort_retain(_capacity);
+    PTopNCounter topn_counter;
+    topn_counter.set_top_num(_top_num);
+    topn_counter.set_space_expand_rate(_space_expand_rate);
+    for(std::vector<Counter>::const_iterator it = _counter_vec->begin(); it != 
_counter_vec->end(); ++it)
+    {
+        PCounter* counter = topn_counter.add_counter();
+        counter->set_item(it->get_item());
+        counter->set_count(it->get_count());
+    }
+    topn_counter.SerializeToString(buffer);
+}
+
+bool TopNCounter::deserialize(const doris::Slice &src) {
+    PTopNCounter topn_counter;
+    if (!topn_counter.ParseFromArray(src.data, src.size)) {
+        LOG(WARNING) << "topn counter deserialize failed";
+        return false;
+    }
+
+    _space_expand_rate = topn_counter.space_expand_rate();
+    set_top_num(topn_counter.top_num());
+    for (int i = 0; i < topn_counter.counter_size(); ++i) {
+        const PCounter& counter = topn_counter.counter(i);
+        _counter_map->insert(std::make_pair(counter.item(), 
Counter(counter.item(), counter.count())));
+        _counter_vec->emplace_back(counter.item(), counter.count());
+    }
+    _ordered = true;
+    return true;
+}
+
+void TopNCounter::sort_retain(uint32_t capacity) {
+    _counter_vec->clear();
+    sort_retain(capacity, _counter_vec);
+    _ordered = true;
+}
+
+void TopNCounter::sort_retain(uint32_t capacity, std::vector<Counter>* 
sort_vec) {
+    for(std::unordered_map<std::string, Counter>::const_iterator it = 
_counter_map->begin(); it != _counter_map->end(); ++it) {
+        sort_vec->emplace_back(it->second.get_item(), it->second.get_count());
+    }
+
+    std::sort(sort_vec->begin(), sort_vec->end(), TopNComparator());
+    if (sort_vec->size() > capacity) {
+        for (uint32_t i = 0, n = sort_vec->size() - capacity; i < n; ++i) {
+            auto &counter = sort_vec->back();
+            _counter_map->erase(counter.get_item());
+            sort_vec->pop_back();
+        }
+    }
+}
+
+// Based on the  parallel version of the Space Saving algorithm as described 
in:
+// A parallel space saving algorithm for frequent items and the Hurwitz zeta 
distribution by Massimo Cafaro, et al.
+void TopNCounter::merge(doris::TopNCounter &&other) {
+    if (other._counter_map->size() == 0) {
+        return;
+    }
+
+    _space_expand_rate = other._space_expand_rate;
+    set_top_num(other._top_num);
+    bool this_full = _counter_map->size() >= _capacity;
+    bool another_full = other._counter_map->size() >= other._capacity;
+
+    uint64_t m1 = this_full ? _counter_vec->back().get_count() : 0;
+    uint64_t m2 = another_full ? other._counter_vec->back().get_count() : 0;
+
+    if (another_full == true) {
+        for (auto &entry : *(this->_counter_map)) {
+            entry.second.add_count(m2);
+        }
+    }
+
+    for (auto &other_entry : *(other._counter_map)) {
+        auto itr = this->_counter_map->find(other_entry.first);
+        if (itr != _counter_map->end()) {
+            itr->second.add_count(other_entry.second.get_count() - m2);
+        } else {
+            this->_counter_map->insert(std::make_pair(other_entry.first,
+                    Counter(other_entry.first,other_entry.second.get_count() + 
m1)));
+        }
+    }
+    _ordered = false;
+    sort_retain(_capacity);
+}
+
+void TopNCounter::finalize(std::string& finalize_str) {
+    if (!_ordered) {
+        sort_retain(_top_num);
+    }
+    // use json format print
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    uint32_t k = 0;
+    writer.StartObject();
+    for (std::vector<Counter>::const_iterator it = _counter_vec->begin(); it 
!= _counter_vec->end() && k < _top_num; ++it, ++k) {
+        writer.Key(it->get_item().data());
+        writer.Uint64(it->get_count());
+    }
+    writer.EndObject();
+    finalize_str = buffer.GetString();
+}
+
+}
diff --git a/be/src/util/topn_counter.h b/be/src/util/topn_counter.h
new file mode 100644
index 0000000..f8dc584
--- /dev/null
+++ b/be/src/util/topn_counter.h
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_UTI_TOPN_COUNTER_H
+#define DORIS_BE_SRC_UTI_TOPN_COUNTER_H
+
+#include <list>
+#include <unordered_map>
+
+#include "common/logging.h"
+#include "runtime/datetime_value.h"
+#include "runtime/decimal_value.h"
+#include "runtime/decimalv2_value.h"
+#include "runtime/large_int_value.h"
+#include "udf/udf.h"
+
+namespace doris {
+
+static const uint32_t DEFAULT_SPACE_EXPAND_RATE = 50;
+
+class Slice;
+
+class Counter {
+public:
+    Counter() = default;
+
+    Counter(const std::string& item, uint64_t count) : _item(item), 
_count(count) {}
+
+    uint64_t get_count() const {
+        return _count;
+    }
+
+    const std::string& get_item() const {
+        return _item;
+    }
+
+    void add_count(uint64_t count) {
+        _count += count;
+    }
+
+    bool operator == (const Counter& other) {
+        if(_item.compare(other._item) != 0) {
+            return false;
+        }
+        if (_count != other._count) {
+            return false;
+        }
+        return true;
+    }
+
+private:
+    std::string _item;
+    uint64_t _count;
+};
+
+
+// Refer to TopNCounter.java in https://github.com/apache/kylin
+// Based on the Space-Saving algorithm and the Stream-Summary data structure 
as described in:
+// Efficient Computation of Frequent and Top-k Elements in Data Streams by 
Metwally, Agrawal, and Abbadi
+class TopNCounter {
+public:
+    TopNCounter(uint32_t space_expand_rate = DEFAULT_SPACE_EXPAND_RATE) :
+            _top_num(0), _space_expand_rate(space_expand_rate), _capacity(0), 
_ordered(false),
+            _counter_map(new std::unordered_map<std::string, 
Counter>(_capacity)),
+            _counter_vec(new std::vector<Counter>(_capacity)){}
+
+    TopNCounter(const Slice& src) :
+            _top_num(0), _space_expand_rate(0), _capacity(0), _ordered(false),
+            _counter_map(new std::unordered_map<std::string, 
Counter>(_capacity)),
+            _counter_vec(new std::vector<Counter>(_capacity)) {
+        bool res = deserialize(src);
+        DCHECK(res);
+    }
+
+    ~TopNCounter() {
+        delete _counter_map;
+        delete _counter_vec;
+    }
+
+    template <typename T>
+    void add_item(const T& item) {
+        add_item(item, 1);
+    }
+
+    void add_item(const BooleanVal& item, uint64_t incrementCount) {
+        add_item_numeric(item, incrementCount);
+    }
+    void add_item(const TinyIntVal& item, uint64_t incrementCount) {
+        add_item_numeric(item, incrementCount);
+    }
+    void add_item(const SmallIntVal& item, uint64_t incrementCount) {
+        add_item_numeric(item, incrementCount);
+    }
+    void add_item(const IntVal& item, uint64_t incrementCount) {
+        add_item_numeric(item, incrementCount);
+    }
+    void add_item(const BigIntVal& item, uint64_t incrementCount) {
+        add_item_numeric(item, incrementCount);
+    }
+    void add_item(const FloatVal& item, uint64_t incrementCount) {
+        add_item_numeric(item, incrementCount);
+    }
+    void add_item(const DoubleVal& item, uint64_t incrementCount) {
+        add_item_numeric(item, incrementCount);
+    }
+    void add_item(const StringVal& item, uint64_t incrementCount) {
+        add_item(std::string((char*) item.ptr, item.len), incrementCount);
+    }
+    void add_item(const DateTimeVal& item, uint64_t incrementCount) {
+        char str[MAX_DTVALUE_STR_LEN];
+        DateTimeValue::from_datetime_val(item).to_string(str);
+        add_item(std::string(str), incrementCount);
+    }
+    void add_item(const LargeIntVal& item, uint64_t incrementCount) {
+        add_item(LargeIntValue::to_string(item.val), incrementCount);
+    }
+    void add_item(const DecimalVal& item, uint64_t incrementCount) {
+        add_item(DecimalValue::from_decimal_val(item).to_string(), 
incrementCount);
+    }
+    void add_item(const DecimalV2Val& item, uint64_t incrementCount) {
+        add_item(DecimalV2Value::from_decimal_val(item).to_string(), 
incrementCount);
+    }
+
+    template <typename T>
+    void add_item_numeric(const T& item, uint64_t incrementCount) {
+        add_item(std::to_string(item.val), incrementCount);
+    }
+
+    void add_item(const std::string& item, uint64_t incrementCount);
+
+    void serialize(std::string* buffer);
+
+    bool deserialize(const Slice& src);
+
+    void merge(doris::TopNCounter&& other);
+
+    // Sort counter by count value and record it in _counter_vec
+    void sort_retain(uint32_t capacity);
+
+    void sort_retain(uint32_t capacity, std::vector<Counter>* sort_vec);
+
+    void finalize(std::string&);
+
+    void set_top_num(uint32_t top_num) {
+        _top_num = top_num;
+        _capacity = top_num * _space_expand_rate;
+    }
+
+private:
+    uint32_t _top_num;
+    uint32_t _space_expand_rate;
+    uint64_t _capacity;
+    bool _ordered;
+    std::unordered_map<std::string, Counter>* _counter_map;
+    std::vector<Counter>* _counter_vec;
+};
+
+class TopNComparator
+{
+public:
+    bool operator () (const Counter& s1, const Counter& s2)
+    {
+        return s1.get_count() > s2.get_count();
+    }
+};
+}
+
+#endif //DORIS_BE_SRC_UTI_TOPN_COUNTER_H
diff --git a/be/test/exprs/CMakeLists.txt b/be/test/exprs/CMakeLists.txt
index fef8172..8c393aa 100644
--- a/be/test/exprs/CMakeLists.txt
+++ b/be/test/exprs/CMakeLists.txt
@@ -34,4 +34,5 @@ ADD_BE_TEST(hll_function_test)
 ADD_BE_TEST(encryption_functions_test)
 #ADD_BE_TEST(in-predicate-test)
 ADD_BE_TEST(math_functions_test)
+ADD_BE_TEST(topn_function_test)
 
diff --git a/be/test/exprs/topn_function_test.cpp 
b/be/test/exprs/topn_function_test.cpp
new file mode 100644
index 0000000..f13c2e2
--- /dev/null
+++ b/be/test/exprs/topn_function_test.cpp
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/anyval_util.h"
+#include "exprs/topn_function.h"
+#include "util/topn_counter.h"
+#include "testutil/function_utils.h"
+#include "zipf_distribution.h"
+
+#include <gtest/gtest.h>
+#include <unordered_map>
+
+
+namespace doris {
+
+static const uint32_t TOPN_NUM = 100;
+static const uint32_t TOTAL_RECORDS = 1000000;
+static const uint32_t PARALLEL = 10;
+
+std::string gen_random(const int len) {
+    std::string possible_characters = 
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+    std::random_device rd;
+    std::mt19937 generator(rd());
+    std::uniform_int_distribution<> dist(0, possible_characters.size()-1);
+
+    std::string rand_str(len, '\0');
+    for(auto& dis: rand_str) {
+        dis = possible_characters[dist(generator)];
+    }
+    return rand_str;
+}
+
+class TopNFunctionsTest : public testing::Test {
+public:
+    TopNFunctionsTest() = default;
+
+    void SetUp() {
+        utils = new FunctionUtils();
+        ctx = utils->get_fn_ctx();
+    }
+
+    void TearDown() {
+        delete utils;
+    }
+
+private:
+    FunctionUtils *utils;
+    FunctionContext *ctx;
+};
+
+void update_accuracy_map(const std::string& item, 
std::unordered_map<std::string, uint32_t>& accuracy_map) {
+    if (accuracy_map.find(item) != accuracy_map.end()) {
+        ++accuracy_map[item];
+    } else {
+        accuracy_map.insert(std::make_pair(item, 1));
+    }
+}
+
+void topn_single(FunctionContext* ctx, std::string& random_str, StringVal& 
dst, std::unordered_map<std::string, uint32_t>& accuracy_map){
+    TopNFunctions::topn_update(ctx, StringVal(((uint8_t*) random_str.data()), 
random_str.length()), TOPN_NUM, &dst);
+    update_accuracy_map(random_str, accuracy_map);
+}
+
+void test_topn_accuracy(FunctionContext* ctx, int key_space, int 
space_expand_rate, double zipf_distribution_exponent) {
+    LOG(INFO) << "topn accuracy : " << "key space : "  << key_space << " , 
space_expand_rate : " << space_expand_rate <<
+        " , zf exponent : " << zipf_distribution_exponent;
+    std::unordered_map<std::string, uint32_t> accuracy_map;
+    // prepare random data
+    std::vector<std::string> random_strs(key_space);
+    for (uint32_t i = 0; i < key_space; ++i) {
+        random_strs[i] = gen_random(10);
+    }
+
+    zipf_distribution<uint64_t, double> zf(key_space, 
zipf_distribution_exponent);
+    std::random_device rd;
+    std::mt19937 gen(rd());
+
+    StringVal topn_column("placeholder");
+    IntVal topn_num_column(TOPN_NUM);
+    IntVal space_expand_rate_column(space_expand_rate);
+    std::vector<doris_udf::AnyVal*> const_vals;
+    const_vals.push_back(&topn_column);
+    const_vals.push_back(&topn_num_column);
+    const_vals.push_back(&space_expand_rate_column);
+    ctx->impl()->set_constant_args(const_vals);
+    // Compute topN in parallel
+    StringVal dst;
+    TopNFunctions::topn_init(ctx, &dst);
+
+    StringVal single_dst_str[PARALLEL];
+    for (uint32_t i = 0; i < PARALLEL; ++i) {
+        TopNFunctions::topn_init(ctx, &single_dst_str[i]);
+    }
+
+    std::random_device random_rd;
+    std::mt19937 random_gen(random_rd());
+    std::uniform_int_distribution<> dist(0, PARALLEL-1);
+    for (uint32_t i = 0; i < TOTAL_RECORDS; ++i) {
+        // generate zipf_distribution
+        uint32_t index = zf(gen);
+        // choose one single topn to update
+        topn_single(ctx, random_strs[index], single_dst_str[dist(random_gen)], 
accuracy_map);
+    }
+
+    for (uint32_t i = 0; i < PARALLEL; ++i) {
+        StringVal serialized_str  = TopNFunctions::topn_serialize(ctx, 
single_dst_str[i]);
+        TopNFunctions::topn_merge(ctx, serialized_str, &dst);
+    }
+
+    // get accuracy result
+    std::vector<Counter> accuracy_sort_vec;
+    for(std::unordered_map<std::string, uint32_t >::const_iterator it = 
accuracy_map.begin(); it != accuracy_map.end(); ++it) {
+        accuracy_sort_vec.emplace_back(it->first, it->second);
+    }
+    std::sort(accuracy_sort_vec.begin(), accuracy_sort_vec.end(), 
TopNComparator());
+
+    // get topn result
+    TopNCounter* topn_dst = reinterpret_cast<TopNCounter*>(dst.ptr);
+    std::vector<Counter> topn_sort_vec;
+    topn_dst->sort_retain(TOPN_NUM, &topn_sort_vec);
+
+    uint32_t error = 0;
+    for (uint32_t i = 0; i < TOPN_NUM; ++i) {
+        Counter& accuracy_counter = accuracy_sort_vec[i];
+        Counter& topn_counter = topn_sort_vec[i];
+        if (accuracy_counter.get_count() != topn_counter.get_count()) {
+            ++error;
+            LOG(INFO) << "Failed";
+            LOG(INFO) << "accuracy counter : (" << accuracy_counter.get_item() 
<< ", " << accuracy_counter.get_count() << ")";
+            LOG(INFO) << "topn counter : (" << topn_counter.get_item() << ", " 
<< topn_counter.get_count() << ")";
+        }
+    }
+    LOG(INFO) << "Total errors : " << error;
+    TopNFunctions::topn_finalize(ctx, dst);
+}
+
+TEST_F(TopNFunctionsTest, topn_accuracy) {
+    std::vector<int> key_space_vec ({1000, 10000, 100000, 500000});
+    std::vector<int> space_expand_rate_vec({20, 50, 100});
+    std::vector<double> zipf_distribution_exponent_vec({0.5, 0.6, 1.0});
+    for (auto ket_space : key_space_vec) {
+        for (auto space_expand_rate : space_expand_rate_vec) {
+            for (auto zipf_distribution_exponent : 
zipf_distribution_exponent_vec) {
+                test_topn_accuracy(ctx, ket_space, space_expand_rate, 
zipf_distribution_exponent);
+            }
+        }
+    }
+
+}
+
+TEST_F(TopNFunctionsTest, topn_update) {
+    StringVal dst;
+    TopNFunctions::topn_init(ctx, &dst);
+    StringVal src1("a");
+    for (uint32_t i = 0; i < 10; ++i) {
+        TopNFunctions::topn_update(ctx, src1, 2, &dst);
+    }
+
+    StringVal src2("b");
+    TopNFunctions::topn_update(ctx, src2, 2, &dst);
+    TopNFunctions::topn_update(ctx, src2, 2, &dst);
+
+    StringVal src3("c");
+    TopNFunctions::topn_update(ctx, src3, 2, &dst);
+
+    StringVal result = TopNFunctions::topn_finalize(ctx, dst);
+    StringVal expected("{\"a\":10,\"b\":2}");
+    ASSERT_EQ(expected, result);
+}
+
+TEST_F(TopNFunctionsTest, topn_merge) {
+    StringVal dst1;
+    TopNFunctions::topn_init(ctx, &dst1);
+    StringVal dst2;
+    TopNFunctions::topn_init(ctx, &dst2);
+
+    StringVal src1("a");
+    for (uint32_t i = 0; i < 10; ++i) {
+        TopNFunctions::topn_update(ctx, src1, 2, &dst1);
+        TopNFunctions::topn_update(ctx, src1, 2, &dst2);
+    }
+    StringVal src2("b");
+    for (uint32_t i = 0; i < 8; ++i) {
+        TopNFunctions::topn_update(ctx, src2, 2, &dst1);
+    }
+    StringVal src3("c");
+    for (uint32_t i = 0; i < 6; ++i) {
+        TopNFunctions::topn_update(ctx, src3, 2, &dst2);
+    }
+
+    StringVal val1 = TopNFunctions::topn_serialize(ctx, dst1);
+    StringVal val2 = TopNFunctions::topn_serialize(ctx, dst2);
+
+    StringVal dst;
+    TopNFunctions::topn_init(ctx, &dst);
+    TopNFunctions::topn_merge(ctx, val1, &dst);
+    TopNFunctions::topn_merge(ctx, val2, &dst);
+    StringVal result = TopNFunctions::topn_finalize(ctx, dst);
+    StringVal expected("{\"a\":20,\"b\":8}");
+    ASSERT_EQ(expected, result);
+}
+
+TEST_F(TopNFunctionsTest, test_null_value) {
+    StringVal dst1;
+    TopNFunctions::topn_init(ctx, &dst1);
+
+    for (uint32_t i = 0; i < 10; ++i) {
+        TopNFunctions::topn_update(ctx, IntVal::null(), 2, &dst1);
+    }
+    StringVal serialized = TopNFunctions::topn_serialize(ctx, dst1);
+
+    StringVal dst2;
+    TopNFunctions::topn_init(ctx, &dst2);
+    TopNFunctions::topn_merge(ctx, serialized, &dst2);
+    StringVal result = TopNFunctions::topn_finalize(ctx, dst2);
+    StringVal expected("{}");
+    ASSERT_EQ(expected, result);
+}
+
+TEST_F(TopNFunctionsTest, test_date_type) {
+    StringVal dst1;
+    TopNFunctions::topn_init(ctx, &dst1);
+
+    DateTimeValue dt(20201001000000);
+    doris_udf::DateTimeVal dt_val;
+    dt.to_datetime_val(&dt_val);
+    for (uint32_t i = 0; i < 10; ++i) {
+        TopNFunctions::topn_update(ctx, dt_val, 1, &dst1);
+    }
+    StringVal serialized = TopNFunctions::topn_serialize(ctx, dst1);
+
+    StringVal dst2;
+    TopNFunctions::topn_init(ctx, &dst2);
+    TopNFunctions::topn_merge(ctx, serialized, &dst2);
+    StringVal result = TopNFunctions::topn_finalize(ctx, dst2);
+    StringVal expected("{\"2020-10-01 00:00:00\":10}");
+    ASSERT_EQ(expected, result);
+}
+
+}
+
+int main(int argc, char** argv) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/be/test/exprs/zipf_distribution.h 
b/be/test/exprs/zipf_distribution.h
new file mode 100644
index 0000000..8ae2fda
--- /dev/null
+++ b/be/test/exprs/zipf_distribution.h
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <random>
+
+/** Refer to 
https://stackoverflow.com/questions/9983239/how-to-generate-zipf-distributed-numbers-efficiently
+ *  Zipf-like random distribution.
+ *
+ * "Rejection-inversion to generate variates from monotone discrete
+ * distributions", Wolfgang Hörmann and Gerhard Derflinger
+ * ACM TOMACS 6.3 (1996): 169-184
+ */
+template<class IntType = unsigned long, class RealType = double>
+class zipf_distribution
+{
+public:
+    typedef RealType input_type;
+    typedef IntType result_type;
+
+    static_assert(std::numeric_limits<IntType>::is_integer, "");
+    static_assert(!std::numeric_limits<RealType>::is_integer, "");
+
+    zipf_distribution(const IntType n=std::numeric_limits<IntType>::max(),
+                      const RealType q=1.0)
+            : n(n)
+            , q(q)
+            , H_x1(H(1.5) - 1.0)
+            , H_n(H(n + 0.5))
+            , dist(H_x1, H_n)
+    {}
+
+    IntType operator()(std::mt19937& rng)
+    {
+        while (true) {
+            const RealType u = dist(rng);
+            const RealType x = H_inv(u);
+            const IntType  k = clamp<IntType>(std::round(x), 1, n);
+            if (u >= H(k + 0.5) - h(k)) {
+                return k;
+            }
+        }
+    }
+
+private:
+    /** Clamp x to [min, max]. */
+    template<typename T>
+    static constexpr T clamp(const T x, const T min, const T max)
+    {
+        return std::max(min, std::min(max, x));
+    }
+
+    /** exp(x) - 1 / x */
+    static double
+    expxm1bx(const double x)
+    {
+        return (std::abs(x) > epsilon)
+               ? std::expm1(x) / x
+               : (1.0 + x/2.0 * (1.0 + x/3.0 * (1.0 + x/4.0)));
+    }
+
+    /** H(x) = log(x) if q == 1, (x^(1-q) - 1)/(1 - q) otherwise.
+     * H(x) is an integral of h(x).
+     *
+     * Note the numerator is one less than in the paper order to work with all
+     * positive q.
+     */
+    const RealType H(const RealType x)
+    {
+        const RealType log_x = std::log(x);
+        return expxm1bx((1.0 - q) * log_x) * log_x;
+    }
+
+    /** log(1 + x) / x */
+    static RealType
+    log1pxbx(const RealType x)
+    {
+        return (std::abs(x) > epsilon)
+               ? std::log1p(x) / x
+               : 1.0 - x * ((1/2.0) - x * ((1/3.0) - x * (1/4.0)));
+    }
+
+    /** The inverse function of H(x) */
+    const RealType H_inv(const RealType x)
+    {
+        const RealType t = std::max(-1.0, x * (1.0 - q));
+        return std::exp(log1pxbx(t) * x);
+    }
+
+    /** That hat function h(x) = 1 / (x ^ q) */
+    const RealType h(const RealType x)
+    {
+        return std::exp(-q * std::log(x));
+    }
+
+    static constexpr RealType epsilon = 1e-8;
+
+    IntType                                  n;     ///< Number of elements
+    RealType                                 q;     ///< Exponent
+    RealType                                 H_x1;  ///< H(x_1)
+    RealType                                 H_n;   ///< H(n)
+    std::uniform_real_distribution<RealType> dist;  ///< [H(x_1), H(n)]
+};
\ No newline at end of file
diff --git a/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md 
b/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md
new file mode 100644
index 0000000..4396728
--- /dev/null
+++ b/docs/en/sql-reference/sql-functions/aggregate-functions/topn.md
@@ -0,0 +1,61 @@
+---
+{
+    "title": "TOPN",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# TOPN
+## description
+### Syntax
+
+`topn(expr, INT top_num[, INT space_expand_rate])`
+
+The topn function uses the Space-Saving algorithm to calculate the top_num 
frequent items in expr, and the result is the 
+frequent items and their occurrence times, which is an approximation
+
+The space_expand_rate parameter is optional and is used to set the number of 
counters used in the Space-Saving algorithm
+```
+counter numbers = top_num * space_expand_rate
+```
+The higher value of space_expand_rate, the more accurate result will be. The 
default value is 50
+
+## example
+```
+MySQL [test]> select topn(keyword,10) from keyword_table where date>= 
'2020-06-01' and date <= '2020-06-19' ;
++------------------------------------------------------------------------------------------------------------+
+| topn(`keyword`, 10)                                                          
                              |
++------------------------------------------------------------------------------------------------------------+
+| a:157, b:138, c:133, d:133, e:131, f:127, g:124, h:122, i:117, k:117         
                              |
++------------------------------------------------------------------------------------------------------------+
+
+MySQL [test]> select date,topn(keyword,10,100) from keyword_table where date>= 
'2020-06-17' and date <= '2020-06-19' group by date;
++------------+-----------------------------------------------------------------------------------------------+
+| date       | topn(`keyword`, 10, 100)                                        
                              |
++------------+-----------------------------------------------------------------------------------------------+
+| 2020-06-19 | a:11, b:8, c:8, d:7, e:7, f:7, g:7, h:7, i:7, j:7               
                              |
+| 2020-06-18 | a:10, b:8, c:7, f:7, g:7, i:7, k:7, l:7, m:6, d:6               
                              |
+| 2020-06-17 | a:9, b:8, c:8, j:8, d:7, e:7, f:7, h:7, i:7, k:7                
                              |
++------------+-----------------------------------------------------------------------------------------------+
+```
+## keyword
+TOPN
\ No newline at end of file
diff --git a/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md 
b/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md
new file mode 100644
index 0000000..11c4544
--- /dev/null
+++ b/docs/zh-CN/sql-reference/sql-functions/aggregate-functions/topn.md
@@ -0,0 +1,60 @@
+---
+{
+    "title": "TOPN",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# TOPN
+## description
+### Syntax
+
+`topn(expr, INT top_num[, INT space_expand_rate])`
+
+该topn函数使用Space-Saving算法计算expr中的top_num个频繁项,结果为频繁项及其出现次数,该结果为近似值
+
+space_expand_rate参数是可选项,该值用来设置Space-Saving算法中使用的counter个数
+```
+counter numbers = top_num * space_expand_rate
+```
+space_expand_rate的值越大,结果越准确,默认值为50
+
+## example
+```
+MySQL [test]> select topn(keyword,10) from keyword_table where date>= 
'2020-06-01' and date <= '2020-06-19' ;
++------------------------------------------------------------------------------------------------------------+
+| topn(`keyword`, 10)                                                          
                              |
++------------------------------------------------------------------------------------------------------------+
+| a:157, b:138, c:133, d:133, e:131, f:127, g:124, h:122, i:117, k:117         
                              |
++------------------------------------------------------------------------------------------------------------+
+
+MySQL [test]> select date,topn(keyword,10,100) from keyword_table where date>= 
'2020-06-17' and date <= '2020-06-19' group by date;
++------------+-----------------------------------------------------------------------------------------------+
+| date       | topn(`keyword`, 10, 100)                                        
                              |
++------------+-----------------------------------------------------------------------------------------------+
+| 2020-06-19 | a:11, b:8, c:8, d:7, e:7, f:7, g:7, h:7, i:7, j:7               
                              |
+| 2020-06-18 | a:10, b:8, c:7, f:7, g:7, i:7, k:7, l:7, m:6, d:6               
                              |
+| 2020-06-17 | a:9, b:8, c:8, j:8, d:7, e:7, f:7, h:7, i:7, k:7                
                              |
++------------+-----------------------------------------------------------------------------------------------+
+```
+## keyword
+TOPN
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
index 0b0ef1b..9f41e69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Function;
 import org.apache.doris.catalog.FunctionSet;
 import org.apache.doris.catalog.ScalarFunction;
+import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
@@ -455,6 +456,30 @@ public class FunctionCallExpr extends Expr {
                 }
             }
         }
+
+        if (fnName.getFunction().equalsIgnoreCase("topn")) {
+            if (children.size() != 2 && children.size() != 3) {
+                throw new AnalysisException("topn(expr, INT [, B]) requires 
two or three parameters");
+            }
+            if (!getChild(1).isConstant() || 
!getChild(1).getType().isIntegerType()) {
+                throw new AnalysisException("topn requires second parameter 
must be a constant Integer Type: "
+                        + this.toSql());
+            }
+            if (getChild(1).getType() != ScalarType.INT) {
+                Expr e = getChild(1).castTo(ScalarType.INT);
+                setChild(1, e);
+            }
+            if (children.size() == 3) {
+                if (!getChild(2).isConstant() || 
!getChild(2).getType().isIntegerType()) {
+                    throw new AnalysisException("topn requires the third 
parameter must be a constant Integer Type: "
+                            + this.toSql());
+                }
+                if (getChild(2).getType() != ScalarType.INT) {
+                    Expr e = getChild(2).castTo(ScalarType.INT);
+                    setChild(2, e);
+                }
+            }
+        }
     }
 
     // Provide better error message for some aggregate builtins. These can be
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
index 579e19c..10ecbdf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
@@ -817,6 +817,70 @@ public class FunctionSet {
                             
"_ZN5doris15BitmapFunctions25bitmap_intersect_finalizeINS_11StringValueEEEN9doris_udf9BigIntValEPNS3_15FunctionContextERKNS3_9StringValE")
                     .build();
 
+    private static final Map<Type, String> TOPN_UPDATE_SYMBOL =
+            ImmutableMap.<Type, String>builder()
+                    .put(Type.BOOLEAN,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10BooleanValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+                    .put(Type.TINYINT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+                    .put(Type.SMALLINT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11SmallIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+                    .put(Type.INT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf6IntValEEEvPNS2_15FunctionContextERKT_RKS3_PNS2_9StringValE")
+                    .put(Type.BIGINT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9BigIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+                    .put(Type.FLOAT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf8FloatValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+                    .put(Type.DOUBLE,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9DoubleValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+                    .put(Type.CHAR,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPS3_")
+                    .put(Type.VARCHAR,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPS3_")
+                    .put(Type.DATE,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+                    .put(Type.DATETIME,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+                    .put(Type.DECIMAL,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10DecimalValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+                    .put(Type.DECIMALV2,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf12DecimalV2ValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+                    .put(Type.LARGEINT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11LargeIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
+                    .build();
+
+    private static final Map<Type, String> TOPN_UPDATE_MORE_PARAM_SYMBOL =
+            ImmutableMap.<Type, String>builder()
+                    .put(Type.BOOLEAN,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10BooleanValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+                    .put(Type.TINYINT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+                    .put(Type.SMALLINT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11SmallIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+                    .put(Type.INT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf6IntValEEEvPNS2_15FunctionContextERKT_RKS3_SA_PNS2_9StringValE")
+                    .put(Type.BIGINT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9BigIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+                    .put(Type.FLOAT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf8FloatValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+                    .put(Type.DOUBLE,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9DoubleValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+                    .put(Type.CHAR,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PS3_")
+                    .put(Type.VARCHAR,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PS3_")
+                    .put(Type.DATE,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+                    .put(Type.DATETIME,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+                    .put(Type.DECIMAL,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10DecimalValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+                    .put(Type.DECIMALV2,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf12DecimalV2ValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+                    .put(Type.LARGEINT,
+                            
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11LargeIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
+                    .build();
+
     public Function getFunction(Function desc, Function.CompareMode mode) {
         List<Function> fns = functions.get(desc.functionName());
         if (fns == null) {
@@ -1185,6 +1249,26 @@ public class FunctionSet {
                     
"_ZN5doris12HllFunctions13hll_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
                     true, false, true));
 
+            // TopN
+            if (TOPN_UPDATE_SYMBOL.containsKey(t)) {
+                addBuiltin(AggregateFunction.createBuiltin("topn",
+                        Lists.newArrayList(t, Type.INT), Type.VARCHAR, 
Type.VARCHAR,
+                        
"_ZN5doris13TopNFunctions9topn_initEPN9doris_udf15FunctionContextEPNS1_9StringValE",
+                        TOPN_UPDATE_SYMBOL.get(t),
+                        
"_ZN5doris13TopNFunctions10topn_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_",
+                        
"_ZN5doris13TopNFunctions14topn_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
+                        
"_ZN5doris13TopNFunctions13topn_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
+                        true, false, true));
+                addBuiltin(AggregateFunction.createBuiltin("topn",
+                        Lists.newArrayList(t, Type.INT, Type.INT), 
Type.VARCHAR, Type.VARCHAR,
+                        
"_ZN5doris13TopNFunctions9topn_initEPN9doris_udf15FunctionContextEPNS1_9StringValE",
+                        TOPN_UPDATE_MORE_PARAM_SYMBOL.get(t),
+                        
"_ZN5doris13TopNFunctions10topn_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_",
+                        
"_ZN5doris13TopNFunctions14topn_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
+                        
"_ZN5doris13TopNFunctions13topn_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
+                        true, false, true));
+            }
+
             if (STDDEV_UPDATE_SYMBOL.containsKey(t)) {
                 addBuiltin(AggregateFunction.createBuiltin("stddev",
                         Lists.newArrayList(t), STDDEV_RETTYPE_SYMBOL.get(t), 
Type.VARCHAR,
diff --git a/gensrc/proto/olap_common.proto b/gensrc/proto/olap_common.proto
index 846b25a..7326b88 100644
--- a/gensrc/proto/olap_common.proto
+++ b/gensrc/proto/olap_common.proto
@@ -48,3 +48,13 @@ enum CompressKind {
     COMPRESS_LZ4 = 2;
 }
 
+message PCounter {
+    required string item = 1;
+    required uint64 count = 2;
+}
+
+message PTopNCounter {
+    required uint32 top_num = 1;
+    required uint32 space_expand_rate = 2;
+    repeated PCounter counter = 3;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to