This is an automated email from the ASF dual-hosted git repository.

smityz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 9832438  feat: add a function test for read/write throttling (#842)
9832438 is described below

commit 983243832bf669830661acad4a3ce942e218f0db
Author: Smilencer <[email protected]>
AuthorDate: Tue Dec 7 15:33:17 2021 +0800

    feat: add a function test for read/write throttling (#842)
---
 rdsn                                           |   2 +-
 src/test/function_test/run.sh                  |   2 +
 src/test/function_test/test_detect_hotspot.cpp |  22 +-
 src/test/function_test/test_throttle.cpp       | 785 +++++++++++++++++++++++++
 src/test/function_test/utils.h                 |  50 ++
 5 files changed, 840 insertions(+), 21 deletions(-)

diff --git a/rdsn b/rdsn
index fc41809..a81b31c 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit fc41809ce1622a47a535a2316df91d4d626f35ed
+Subproject commit a81b31ce763ea71d8ae57ae3e88520123d47fd36
diff --git a/src/test/function_test/run.sh b/src/test/function_test/run.sh
index f7abaa3..8995173 100755
--- a/src/test/function_test/run.sh
+++ b/src/test/function_test/run.sh
@@ -79,4 +79,6 @@ if [ $on_travis == "NO" ]; then
     exit_if_fail $? "run test backup_restore_test load failed: $test_case 
$config_file $table_name"
     GTEST_OUTPUT="xml:$REPORT_DIR/split.xml" 
GTEST_FILTER="partition_split_test.*" ./$test_case $config_file $table_name
     exit_if_fail $? "run test split failed: $test_case $config_file 
$table_name"
+    GTEST_OUTPUT="xml:$REPORT_DIR/test_throttle.xml" 
GTEST_FILTER="test_throttle.*" ./$test_case $config_file $table_name
+    exit_if_fail $? "run test test_throttle load failed: $test_case 
$config_file $table_name"
 fi
diff --git a/src/test/function_test/test_detect_hotspot.cpp 
b/src/test/function_test/test_detect_hotspot.cpp
index d60b90a..8eb2ad1 100644
--- a/src/test/function_test/test_detect_hotspot.cpp
+++ b/src/test/function_test/test_detect_hotspot.cpp
@@ -23,33 +23,15 @@
 #include <dsn/dist/replication/replication_ddl_client.h>
 #include <pegasus/client.h>
 #include <gtest/gtest.h>
-#include <boost/lexical_cast.hpp>
-#include <dsn/utility/rand.h>
 
 #include "base/pegasus_const.h"
 #include "global_env.h"
+#include "utils.h"
 
 using namespace ::dsn;
 using namespace ::dsn::replication;
 using namespace pegasus;
 
-static std::string generate_hash_key_by_random(bool is_hotkey, int probability 
= 100)
-{
-    if (is_hotkey && (dsn::rand::next_u32(100) < probability)) {
-        return "ThisisahotkeyThisisahotkey";
-    }
-    static const std::string chars("abcdefghijklmnopqrstuvwxyz"
-                                   "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
-                                   "1234567890"
-                                   "!@#$%^&*()"
-                                   "`~-_=+[{]{\\|;:'\",<.>/? ");
-    std::string result;
-    for (int i = 0; i < 20; i++) {
-        result += chars[dsn::rand::next_u32(chars.size())];
-    }
-    return result;
-}
-
 enum detection_type
 {
     read_data,
@@ -105,7 +87,7 @@ public:
 
         for (int i = 0; dsn_now_s() - start < time_duration; ++i %= 1000) {
             std::string index = std::to_string(i);
-            std::string h_key = generate_hash_key_by_random(kt, 50);
+            std::string h_key = generate_hotkey(kt, 50);
             std::string s_key = "sortkey_" + index;
             std::string value = "value_" + index;
             if (dt == detection_type::write_data) {
diff --git a/src/test/function_test/test_throttle.cpp 
b/src/test/function_test/test_throttle.cpp
new file mode 100644
index 0000000..38c9fe7
--- /dev/null
+++ b/src/test/function_test/test_throttle.cpp
@@ -0,0 +1,785 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <dsn/utility/filesystem.h>
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <pegasus/client.h>
+#include <gtest/gtest.h>
+#include <dsn/utility/TokenBucket.h>
+#include <dsn/service_api_cpp.h>
+#include <dsn/dist/fmt_logging.h>
+#include <fstream>
+
+#include "base/pegasus_const.h"
+#include "global_env.h"
+#include "utils.h"
+
+using namespace dsn;
+using namespace dsn::replication;
+using namespace pegasus;
+
+enum class throttle_type
+{
+    read_by_qps,
+    read_by_size,
+    write_by_qps,
+    write_by_size
+};
+
+enum class operation_type
+{
+    get,
+    multi_get,
+    set,
+    multi_set
+};
+
+struct throttle_test_plan
+{
+    std::string test_plan_case = "";
+    operation_type ot = operation_type::get;
+    int single_value_sz = 0;
+    int multi_count = 0;
+    int limit_qps = 0;
+    bool random_value_size = false;
+    bool is_hotkey = false;
+};
+
+#define ToString(x) #x
+
+#define TIMELY_RECORD(time_interval, is_reject, size)                          
                    \
+    do {                                                                       
                    \
+        records[ToString(time_interval##_query_times)]++;                      
                    \
+        records[ToString(time_interval##_query_size)] += size;                 
                    \
+        if (is_reject) {                                                       
                    \
+            records[ToString(time_interval##_reject_times)]++;                 
                    \
+            records[ToString(time_interval##_reject_size)] += size;            
                    \
+        } else {                                                               
                    \
+            records[ToString(time_interval##_successful_times)]++;             
                    \
+            records[ToString(time_interval##_successful_size)] += size;        
                    \
+        }                                                                      
                    \
+    } while (0)
+
+struct throttle_test_recorder
+{
+    uint64_t start_time_ms;
+    uint64_t duration_ms;
+    std::map<std::string, uint64_t> records;
+    std::string test_name;
+    std::vector<std::string> parameter_seq = {"total_qps",
+                                              "total_size_per_sec",
+                                              "first_10_ms_successful_times",
+                                              "first_100_ms_successful_times",
+                                              "first_1000_ms_successful_times",
+                                              "first_5000_ms_successful_times",
+                                              "first_10_ms_successful_size",
+                                              "first_100_ms_successful_size",
+                                              "first_1000_ms_successful_size",
+                                              "first_5000_ms_successful_size"};
+
+    void start_test(const std::string &test_case, uint64_t time_duration_s)
+    {
+        test_name = test_case;
+        start_time_ms = dsn_now_ms();
+        duration_ms = time_duration_s * 1000;
+        records.emplace(std::make_pair("duration_ms", duration_ms));
+    }
+
+    bool is_time_up() { return dsn_now_ms() - start_time_ms > duration_ms; }
+
+    void record(uint64_t size, bool is_reject)
+    {
+        if (is_time_up()) {
+            return;
+        }
+        auto now_ns = dsn_now_ms();
+        if (now_ns - start_time_ms <= 10) {
+            TIMELY_RECORD(first_10_ms, is_reject, size);
+        }
+        if (now_ns - start_time_ms <= 100) {
+            TIMELY_RECORD(first_100_ms, is_reject, size);
+        }
+        if (now_ns - start_time_ms <= 1000) {
+            TIMELY_RECORD(first_1000_ms, is_reject, size);
+        }
+        if (now_ns - start_time_ms <= 5000) {
+            TIMELY_RECORD(first_5000_ms, is_reject, size);
+        }
+        TIMELY_RECORD(total, is_reject, size);
+
+        records["total_qps"] = records["total_successful_times"] / 
(duration_ms / 1000);
+        records["total_size_per_sec"] = records["total_successful_size"] / 
(duration_ms / 1000);
+    }
+
+    void print_results(const std::string &dir)
+    {
+        std::streambuf *psbuf, *backup;
+        std::ofstream file;
+        file.open(dir, std::ios::out | std::ios::app);
+        backup = std::cout.rdbuf();
+        psbuf = file.rdbuf();
+        std::cout.rdbuf(psbuf);
+
+        std::cout << "test case: " << test_name << std::endl;
+        for (const auto &iter : parameter_seq) {
+            std::cout << iter << ": " << records[iter] << std::endl;
+        }
+        std::cout << std::endl;
+
+        std::cout.rdbuf(backup);
+        file.close();
+
+        return;
+    }
+};
+
+const int test_hashkey_len = 50;
+const int test_sortkey_len = 50;
+
+// read/write throttle function test
+// the details of records are saved in 
`./src/builder/test/function_test/throttle_test_result.txt`
+class test_throttle : public testing::Test
+{
+public:
+    virtual void SetUp() override
+    {
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("pwd");
+        system("./run.sh clear_onebox");
+        system("cp src/server/config.min.ini config-server-test-hotspot.ini");
+        system("sed -i \"/^\\s*enable_detect_hotkey/c enable_detect_hotkey = "
+               "true\" config-server-test-hotspot.ini");
+        system("./run.sh start_onebox -c -w --config_path 
config-server-test-hotspot.ini");
+        std::this_thread::sleep_for(std::chrono::seconds(3));
+
+        std::vector<dsn::rpc_address> meta_list;
+        replica_helper::load_meta_servers(
+            meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), 
"single_master_cluster");
+
+        ddl_client = std::make_shared<replication_ddl_client>(meta_list);
+        pg_client =
+            
pegasus::pegasus_client_factory::get_client("single_master_cluster", 
app_name.c_str());
+
+        auto err = ddl_client->create_app(app_name.c_str(), "pegasus", 4, 3, 
{}, false);
+        ASSERT_EQ(dsn::ERR_OK, err);
+    }
+
+    virtual void TearDown() override
+    {
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+
+    void set_throttle(throttle_type type, uint64_t value)
+    {
+        std::vector<std::string> keys, values;
+        if (type == throttle_type::read_by_qps) {
+            keys.emplace_back("replica.read_throttling");
+            values.emplace_back(fmt::format("{}*reject*200", value));
+        } else if (type == throttle_type::read_by_size) {
+            keys.emplace_back("replica.read_throttling_by_size");
+            values.emplace_back(fmt::format("{}", value));
+        } else if (type == throttle_type::write_by_qps) {
+            keys.emplace_back("replica.write_throttling");
+            values.emplace_back(fmt::format("{}*reject*10", value));
+        } else if (type == throttle_type::write_by_size) {
+            keys.emplace_back("replica.write_throttling_by_size");
+            values.emplace_back(fmt::format("{}*reject*200", value));
+        }
+        auto resp = ddl_client->set_app_envs(app_name, keys, values);
+        dassert_f(
+            resp.get_error().code() == ERR_OK, "Set env failed: {}", 
resp.get_value().hint_message);
+    }
+
+    void restore_throttle()
+    {
+        std::map<std::string, std::string> envs;
+        ddl_client->get_app_envs(app_name, envs);
+        std::vector<std::string> keys;
+        for (const auto &iter : envs) {
+            keys.emplace_back(iter.first);
+        }
+        auto resp = ddl_client->del_app_envs(app_name, keys);
+        dassert_f(resp == ERR_OK, "Del env failed");
+    }
+
+    throttle_test_recorder start_test(throttle_test_plan test_plan, uint64_t 
time_duration_s = 10)
+    {
+        std::cout << fmt::format("start test, on {}", 
test_plan.test_plan_case) << std::endl;
+
+        dassert(pg_client, "pg_client is nullptr");
+
+        throttle_test_recorder r;
+        r.start_test(test_plan.test_plan_case, time_duration_s);
+
+        bool is_running = true;
+        std::atomic<int64_t> ref_count(0);
+
+        while (!r.is_time_up()) {
+            auto h_key = generate_hotkey(test_plan.is_hotkey, 75, 
test_hashkey_len);
+            auto s_key = generate_random_string(test_sortkey_len);
+            auto value = generate_random_string(test_plan.random_value_size
+                                                    ? 
dsn::rand::next_u32(test_plan.single_value_sz)
+                                                    : 
test_plan.single_value_sz);
+            auto sortkey_value_pairs = generate_sortkey_value_map(
+                generate_str_vector_by_random(test_sortkey_len, 
test_plan.multi_count),
+                generate_str_vector_by_random(
+                    test_plan.single_value_sz, test_plan.multi_count, 
test_plan.random_value_size));
+
+            if (test_plan.ot == operation_type::set) {
+                ref_count++;
+                pg_client->async_set(
+                    h_key,
+                    s_key,
+                    value,
+                    [&, h_key, s_key, value](int ec, 
pegasus_client::internal_info &&info) {
+                        if (!is_running) {
+                            ref_count--;
+                            return;
+                        }
+                        dassert_f(ec == PERR_OK || ec == PERR_APP_BUSY,
+                                  "get/set data failed, error code:{}",
+                                  ec);
+                        r.record(value.size() + h_key.size() + s_key.size(), 
ec == PERR_APP_BUSY);
+                        ref_count--;
+                    });
+            } else if (test_plan.ot == operation_type::multi_set) {
+                ref_count++;
+                pg_client->async_multi_set(
+                    h_key,
+                    sortkey_value_pairs,
+                    [&, h_key, sortkey_value_pairs](int ec, 
pegasus_client::internal_info &&info) {
+                        if (!is_running) {
+                            ref_count--;
+                            return;
+                        }
+                        dassert_f(ec == PERR_OK || ec == PERR_APP_BUSY,
+                                  "get/set data failed, error code:{}",
+                                  ec);
+                        int total_size = 0;
+                        for (const auto &iter : sortkey_value_pairs) {
+                            total_size += iter.second.size();
+                        }
+                        r.record(total_size + h_key.size(), ec == 
PERR_APP_BUSY);
+                        ref_count--;
+                    });
+            } else if (test_plan.ot == operation_type::get) {
+                ref_count++;
+                pg_client->async_set(
+                    h_key,
+                    s_key,
+                    value,
+                    [&, h_key, s_key, value](int ec_write, 
pegasus_client::internal_info &&info) {
+                        if (!is_running) {
+                            ref_count--;
+                            return;
+                        }
+                        dassert_f(ec_write == PERR_OK, "set data failed, error 
code:{}", ec_write);
+                        ref_count++;
+                        pg_client->async_get(
+                            h_key,
+                            s_key,
+                            [&, h_key, s_key, value](int ec_read,
+                                                     std::string &&val,
+                                                     
pegasus_client::internal_info &&info) {
+                                if (!is_running) {
+                                    ref_count--;
+                                    return;
+                                }
+                                dassert_f(ec_read == PERR_OK || ec_read == 
PERR_APP_BUSY,
+                                          "get data failed, error code:{}",
+                                          ec_read);
+                                r.record(value.size() + h_key.size() + 
s_key.size(),
+                                         ec_read == PERR_APP_BUSY);
+                                ref_count--;
+                            });
+                        ref_count--;
+                    });
+            } else if (test_plan.ot == operation_type::multi_get) {
+                ref_count++;
+                pg_client->async_multi_set(
+                    h_key,
+                    sortkey_value_pairs,
+                    [&, h_key](int ec_write, pegasus_client::internal_info 
&&info) {
+                        if (!is_running) {
+                            ref_count--;
+                            return;
+                        }
+                        dassert_f(ec_write == PERR_OK, "set data failed, error 
code:{}", ec_write);
+                        ref_count++;
+                        std::set<std::string> empty_sortkeys;
+                        pg_client->async_multi_get(
+                            h_key,
+                            empty_sortkeys,
+                            [&, h_key](int ec_read,
+                                       std::map<std::string, std::string> 
&&values,
+                                       pegasus_client::internal_info &&info) {
+                                if (!is_running) {
+                                    ref_count--;
+                                    return;
+                                }
+                                dassert_f(ec_read == PERR_OK || ec_read == 
PERR_APP_BUSY,
+                                          "get data failed, error code:{}",
+                                          ec_read);
+                                int total_size = 0;
+                                for (const auto &iter : values) {
+                                    total_size += iter.second.size();
+                                }
+                                r.record(total_size + h_key.size(), ec_read == 
PERR_APP_BUSY);
+                                ref_count--;
+                            });
+                        ref_count--;
+                    });
+            }
+        }
+        is_running = false;
+        while (ref_count.load() != 0) {
+            sleep(1);
+        }
+
+        
r.print_results("./src/builder/test/function_test/throttle_test_result.txt");
+        return r;
+    }
+
+    const std::string app_name = "throttle_test";
+    std::shared_ptr<replication_ddl_client> ddl_client;
+    pegasus::pegasus_client *pg_client;
+};
+
+TEST_F(test_throttle, test)
+{
+    throttle_test_plan plan;
+    throttle_test_recorder result;
+
+    plan = {"set test / throttle by size / normal value size", 
operation_type::set, 1024, 1, 50};
+    set_throttle(throttle_type::write_by_size,
+                 plan.limit_qps * plan.single_value_sz * plan.multi_count);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 15);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 15);
+
+    plan = {"set test / throttle by qps / normal value size", 
operation_type::set, 1024, 1, 50};
+    set_throttle(throttle_type::write_by_qps, plan.limit_qps);
+    std::cout << "wait 30s for setting env " << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 15);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 15);
+
+    plan = {"get test / throttle by size / normal value size", 
operation_type::get, 1024, 1, 50};
+    set_throttle(throttle_type::read_by_size,
+                 plan.limit_qps * plan.single_value_sz * plan.multi_count);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    plan = {"get test / throttle by qps", operation_type::get, 1024, 1, 50};
+    set_throttle(throttle_type::read_by_qps, plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 15);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 15);
+
+    plan = {"multi_get test / throttle by size / normal value size",
+            operation_type::multi_get,
+            1024,
+            50,
+            50};
+    set_throttle(throttle_type::read_by_size,
+                 (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                     plan.multi_count * plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    plan = {"multi_set test / throttle by size / normal value size",
+            operation_type::multi_set,
+            1024,
+            50,
+            50};
+    set_throttle(throttle_type::write_by_size,
+                 (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                     plan.multi_count * plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+    plan = {
+        "set test / throttle by qps&size / normal value size", 
operation_type::set, 1024, 1, 50};
+    set_throttle(throttle_type::write_by_size,
+                 plan.limit_qps * plan.single_value_sz * plan.multi_count);
+    set_throttle(throttle_type::write_by_qps, plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 15);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 15);
+
+    plan = {
+        "get test / throttle by qps&size / normal value size", 
operation_type::get, 1024, 1, 50};
+    set_throttle(throttle_type::read_by_size,
+                 plan.limit_qps * plan.single_value_sz * plan.multi_count);
+    set_throttle(throttle_type::read_by_qps, plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 15);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 15);
+
+    // mix throttle case
+    plan = {"set test / throttle by qps&size,loose size throttle / normal 
value size",
+            operation_type::set,
+            1024,
+            1,
+            50};
+    set_throttle(throttle_type::write_by_size,
+                 plan.limit_qps *
+                     (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                     plan.multi_count * 1000);
+    set_throttle(throttle_type::write_by_qps, plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 15);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 15);
+
+    plan = {"get test / throttle by qps&size,loose size throttle/normal value 
size",
+            operation_type::get,
+            1024,
+            1,
+            50};
+    set_throttle(throttle_type::read_by_size,
+                 plan.limit_qps *
+                     (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                     plan.multi_count * 1000);
+    set_throttle(throttle_type::read_by_qps, plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 15);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 15);
+
+    plan = {"set test / throttle by qps&size,loose qps throttle / normal value 
size",
+            operation_type::set,
+            1024,
+            1,
+            50};
+    set_throttle(throttle_type::write_by_size,
+                 plan.limit_qps * plan.single_value_sz * plan.multi_count);
+    set_throttle(throttle_type::write_by_qps, plan.limit_qps * 1000);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    plan = {"get test / throttle by qps&size,loose qps throttle/normal value 
size",
+            operation_type::get,
+            1024,
+            1,
+            50};
+    set_throttle(throttle_type::read_by_size,
+                 plan.limit_qps * plan.single_value_sz * plan.multi_count);
+    set_throttle(throttle_type::read_by_qps, plan.limit_qps * 1000);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    // big value test can't run normally in the function test
+    plan = {"set test / throttle by size / 20kb value size", 
operation_type::set, 1024 * 20, 1, 50};
+    set_throttle(throttle_type::write_by_size,
+                 (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                     plan.multi_count * plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    plan = {"get test / throttle by size / 20kb value size", 
operation_type::get, 1024 * 20, 1, 50};
+    set_throttle(throttle_type::read_by_size,
+                 (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                     plan.multi_count * plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    plan = {"set test / throttle by size / 50kb value size", 
operation_type::set, 1024 * 50, 1, 50};
+    set_throttle(throttle_type::write_by_size,
+                 (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                     plan.multi_count * plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    plan = {"get test / throttle by size / 50kb value size", 
operation_type::get, 1024 * 50, 1, 50};
+    set_throttle(throttle_type::read_by_size,
+                 (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                     plan.multi_count * plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    plan = {"set test / throttle by size / 100b value size", 
operation_type::set, 100, 1, 50};
+    set_throttle(throttle_type::write_by_size,
+                 (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                     plan.multi_count * plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    plan = {"get test / throttle by size / 100b value size", 
operation_type::get, 100, 1, 50};
+    set_throttle(throttle_type::read_by_size,
+                 (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                     plan.multi_count * plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    plan = {"set test / throttle by size / 10b value size", 
operation_type::set, 10, 1, 50};
+    set_throttle(throttle_type::write_by_size,
+                 (plan.single_value_sz + test_hashkey_len + test_sortkey_len) 
* plan.multi_count *
+                     plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    plan = {"get test / throttle by size / 10b value size", 
operation_type::get, 10, 1, 50};
+    set_throttle(throttle_type::read_by_size,
+                 (plan.single_value_sz + test_hashkey_len + test_sortkey_len) 
* plan.multi_count *
+                     plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)(plan.single_value_sz + test_hashkey_len + 
test_sortkey_len) *
+                  plan.multi_count * plan.limit_qps * 0.7);
+
+    //  random value case
+    plan = {"multi_get test / throttle by size / random value size",
+            operation_type::multi_get,
+            1024 * 5,
+            50,
+            50,
+            true};
+    set_throttle(throttle_type::read_by_size, 5000000);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"], (uint64_t)5000000 * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"], (uint64_t)5000000 * 0.7);
+
+    plan = {"multi_set test / throttle by size / random value size",
+            operation_type::multi_set,
+            1024 * 5,
+            50,
+            50,
+            true};
+    set_throttle(throttle_type::write_by_size, 5000000);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"], (uint64_t)5000000 * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"], (uint64_t)5000000 * 0.7);
+
+    // hotkey test
+    plan = {
+        "get test / throttle by qps / hotkey test", operation_type::get, 1024, 
1, 50, false, true};
+    set_throttle(throttle_type::read_by_qps, plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 15);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 15);
+
+    plan = {
+        "set test / throttle by qps / hotkey test", operation_type::set, 1024, 
1, 50, false, true};
+    set_throttle(throttle_type::write_by_qps, plan.limit_qps);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 15);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 15);
+
+    plan = {
+        "set test / throttle by size / hotkey test", operation_type::set, 
1024, 1, 50, false, true};
+    set_throttle(throttle_type::write_by_size,
+                 plan.limit_qps * plan.single_value_sz * plan.multi_count);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)plan.limit_qps * plan.single_value_sz * 
plan.multi_count * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)plan.limit_qps * plan.single_value_sz * 
plan.multi_count * 0.7);
+
+    plan = {
+        "get test / throttle by size / hotkey test", operation_type::get, 
1024, 1, 50, false, true};
+    set_throttle(throttle_type::read_by_size,
+                 plan.limit_qps * plan.single_value_sz * plan.multi_count);
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_size_per_sec"],
+              (uint64_t)plan.limit_qps * plan.single_value_sz * 
plan.multi_count * 1.3);
+    ASSERT_GT(result.records["total_size_per_sec"],
+              (uint64_t)plan.limit_qps * plan.single_value_sz * 
plan.multi_count * 0.7);
+
+    // mix delay&reject test
+    plan = {
+        "set test / throttle by qps 500 / no delay throttle", 
operation_type::set, 1024, 1, 500};
+    ddl_client->set_app_envs(app_name, {"replica.write_throttling"}, 
{"500*reject*200"});
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 100);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 100);
+
+    plan = {
+        "get test / throttle by qps 500 / no delay throttle", 
operation_type::get, 1024, 1, 500};
+    ddl_client->set_app_envs(app_name, {"replica.read_throttling"}, 
{"500*reject*200"});
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 100);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 100);
+
+    plan = {"set test / throttle by qps 500 / delay throttle", 
operation_type::set, 1024, 1, 500};
+    ddl_client->set_app_envs(
+        app_name, {"replica.write_throttling"}, 
{"300*delay*100,500*reject*200"});
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 100);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 100);
+
+    plan = {"get test / throttle by qps 500 / delay throttle", 
operation_type::get, 1024, 1, 500};
+    ddl_client->set_app_envs(
+        app_name, {"replica.read_throttling"}, 
{"300*delay*100,500*reject*200"});
+    std::cout << "wait 30s for setting env" << std::endl;
+    sleep(30);
+    result = start_test(plan);
+    restore_throttle();
+    ASSERT_LE(result.records["total_qps"], plan.limit_qps + 100);
+    ASSERT_GT(result.records["total_qps"], plan.limit_qps - 100);
+}
diff --git a/src/test/function_test/utils.h b/src/test/function_test/utils.h
index fc597a2..c08e130 100644
--- a/src/test/function_test/utils.h
+++ b/src/test/function_test/utils.h
@@ -19,6 +19,10 @@
 
 #pragma once
 
+#include <dsn/utility/rand.h>
+#include <dsn/c/api_utilities.h>
+#include <dsn/dist/fmt_logging.h>
+
 #define RETRY_OPERATION(CLIENT_FUNCTION, RESULT)                               
                    \
     do {                                                                       
                    \
         for (int i = 0; i < 60; ++i) {                                         
                    \
@@ -30,3 +34,49 @@
             }                                                                  
                    \
         }                                                                      
                    \
     } while (0)
+
+inline std::string generate_random_string(uint32_t str_len = 20)
+{
+    static const std::string chars("abcdefghijklmnopqrstuvwxyz"
+                                   "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+                                   "1234567890");
+    std::string result;
+    for (int i = 0; i < str_len; i++) {
+        result += chars[dsn::rand::next_u32(chars.size())];
+    }
+    return result;
+}
+
+inline std::string generate_hotkey(bool is_hotkey, int probability = 100, 
uint32_t str_len = 20)
+{
+    if (is_hotkey && (dsn::rand::next_u32(100) < probability)) {
+        return "ThisisahotkeyThisisahotkey";
+    }
+    return generate_random_string(str_len);
+}
+
+inline std::vector<std::string> generate_str_vector_by_random(uint32_t 
single_str_len,
+                                                              uint32_t arr_len,
+                                                              bool 
random_value_size = false)
+{
+    std::vector<std::string> result;
+    result.reserve(arr_len);
+    for (int i = 0; i < arr_len; i++) {
+        result.emplace_back(generate_random_string(
+            random_value_size ? dsn::rand::next_u32(single_str_len) : 
single_str_len));
+    }
+    return result;
+}
+
+inline std::map<std::string, std::string>
+generate_sortkey_value_map(const std::vector<std::string> sortkeys,
+                           const std::vector<std::string> values)
+{
+    std::map<std::string, std::string> result;
+    dcheck_eq(sortkeys.size(), values.size());
+    int len = sortkeys.size();
+    for (int i = 0; i < len; i++) {
+        result.emplace(std::make_pair(sortkeys[i], values[i]));
+    }
+    return result;
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to