This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new bacc12a8b feat(table_env): Support to list all available table envs
(#1915)
bacc12a8b is described below
commit bacc12a8bc37609f71c2a2ca3af836ef6c376fa7
Author: Yingchun Lai <[email protected]>
AuthorDate: Fri Apr 19 15:34:08 2024 +0800
feat(table_env): Support to list all available table envs (#1915)
Now we can query all available table environments by `/envs/list` path of
meta server, e.g. `curl 127.0.0.1:34601/envs/list`, the output looks like:
```
$ curl 127.0.0.1:34601/envs/list
{
"business.info": {
"limitation": "",
"sample": "",
"type": "string"
},
"default_ttl": {
"limitation": ">= 0",
"sample": "86400",
"type": "unsigned int32"
},
"manual_compact.disabled": {
"limitation": "true | false",
"sample": "true",
"type": "bool"
},
...
}
```
---
src/base/meta_store.cpp | 5 +-
src/base/meta_store.h | 3 -
src/common/replica_envs.cpp | 5 +
src/common/replica_envs.h | 11 +-
src/http/http_call_registry.h | 5 +
src/http/http_server.cpp | 5 +
src/http/http_server.h | 2 +
src/meta/app_env_validator.cpp | 432 +++++++++++++--------
src/meta/app_env_validator.h | 77 +++-
src/meta/meta_http_service.cpp | 7 +-
src/meta/server_state.cpp | 18 +-
src/meta/server_state.h | 3 +
src/meta/test/meta_app_envs_test.cpp | 46 +--
src/meta/test/meta_http_service_test.cpp | 8 +-
src/meta/test/meta_mauanl_compaction_test.cpp | 64 ++-
src/meta/test/meta_test_base.cpp | 2 +-
src/meta/test/server_state_test.cpp | 49 ++-
src/replica/replica.h | 12 +-
src/replica/replica_throttle.cpp | 14 +-
src/replica/test/throttling_controller_test.cpp | 18 +-
src/server/pegasus_manual_compact_service.cpp | 11 +-
src/server/pegasus_manual_compact_service.h | 3 -
src/server/pegasus_server_impl.cpp | 36 +-
src/server/test/manual_compact_service_test.cpp | 4 +-
src/server/test/pegasus_server_impl_test.cpp | 14 +-
.../token_bucket_throttling_controller_test.cpp | 183 ++++++---
src/utils/throttling_controller.cpp | 199 +++++++---
src/utils/throttling_controller.h | 30 +-
src/utils/token_bucket_throttling_controller.cpp | 88 ++---
src/utils/token_bucket_throttling_controller.h | 4 +-
30 files changed, 874 insertions(+), 484 deletions(-)
diff --git a/src/base/meta_store.cpp b/src/base/meta_store.cpp
index 94aaf7db7..f9ca38ab7 100644
--- a/src/base/meta_store.cpp
+++ b/src/base/meta_store.cpp
@@ -34,9 +34,6 @@ const std::string meta_store::DATA_VERSION =
"pegasus_data_version";
const std::string meta_store::LAST_FLUSHED_DECREE =
"pegasus_last_flushed_decree";
const std::string meta_store::LAST_MANUAL_COMPACT_FINISH_TIME =
"pegasus_last_manual_compact_finish_time";
-const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL = "normal";
-const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE =
"prefer_write";
-const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD =
"bulk_load";
meta_store::meta_store(const char *log_prefix,
rocksdb::DB *db,
@@ -85,7 +82,7 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB
*db,
std::string meta_store::get_usage_scenario() const
{
// If couldn't find rocksdb usage scenario in meta column family, return
normal in default.
- std::string usage_scenario = ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
+ std::string usage_scenario =
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
auto ec = get_string_value_from_meta_cf(
false, dsn::replica_envs::ROCKSDB_USAGE_SCENARIO, &usage_scenario);
CHECK_PREFIX_MSG(ec == ::dsn::ERR_OK || ec == ::dsn::ERR_OBJECT_NOT_FOUND,
diff --git a/src/base/meta_store.h b/src/base/meta_store.h
index 060692d5c..a6e8e953b 100644
--- a/src/base/meta_store.h
+++ b/src/base/meta_store.h
@@ -92,9 +92,6 @@ private:
static const std::string DATA_VERSION;
static const std::string LAST_FLUSHED_DECREE;
static const std::string LAST_MANUAL_COMPACT_FINISH_TIME;
- static const std::string ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
- static const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE;
- static const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
const std::string _log_prefix;
rocksdb::DB *_db;
diff --git a/src/common/replica_envs.cpp b/src/common/replica_envs.cpp
index f1b85b0f2..2094f2e16 100644
--- a/src/common/replica_envs.cpp
+++ b/src/common/replica_envs.cpp
@@ -21,6 +21,11 @@
namespace dsn {
const uint64_t replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS = 20;
+const std::string
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE("force");
+const std::string
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP("skip");
+const std::string replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL("normal");
+const std::string
replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE("prefer_write");
+const std::string
replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD("bulk_load");
const std::string
replica_envs::DENY_CLIENT_REQUEST("replica.deny_client_request");
const std::string
replica_envs::WRITE_QPS_THROTTLING("replica.write_throttling");
const std::string
replica_envs::WRITE_SIZE_THROTTLING("replica.write_throttling_by_size");
diff --git a/src/common/replica_envs.h b/src/common/replica_envs.h
index e80836397..d1a562cb1 100644
--- a/src/common/replica_envs.h
+++ b/src/common/replica_envs.h
@@ -35,8 +35,7 @@ namespace dsn {
class replica_envs
{
public:
- static const uint64_t MIN_SLOW_QUERY_THRESHOLD_MS;
-
+ // Environment variable keys.
static const std::string DENY_CLIENT_REQUEST;
static const std::string WRITE_QPS_THROTTLING;
static const std::string WRITE_SIZE_THROTTLING;
@@ -74,6 +73,14 @@ public:
static const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS;
static const std::set<std::string> ROCKSDB_STATIC_OPTIONS;
+
+ // Environment variable values.
+ static const uint64_t MIN_SLOW_QUERY_THRESHOLD_MS;
+ static const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
+ static const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;
+ static const std::string ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
+ static const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE;
+ static const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
};
} // namespace dsn
diff --git a/src/http/http_call_registry.h b/src/http/http_call_registry.h
index 2e944463c..69eb80fc1 100644
--- a/src/http/http_call_registry.h
+++ b/src/http/http_call_registry.h
@@ -51,7 +51,12 @@ public:
void add(const std::shared_ptr<http_call> &call)
{
std::lock_guard<std::mutex> guard(_mu);
+// Some tests (e.g. policy_context_test) create multiple objects which
+// register duplicate http paths, so disable checking the path when
+// MOCK_TEST enabled.
+#ifndef MOCK_TEST
CHECK_EQ_MSG(_call_map.count(call->path), 0, "{} has been added",
call->path);
+#endif
_call_map[call->path] = call;
}
diff --git a/src/http/http_server.cpp b/src/http/http_server.cpp
index cbd214bba..ce2178bfd 100644
--- a/src/http/http_server.cpp
+++ b/src/http/http_server.cpp
@@ -109,6 +109,11 @@ void http_service::register_handler(std::string sub_path,
http_call_registry::instance().add(std::move(call));
}
+void http_service::deregister_handler(std::string sub_path) const
+{
+ http_call_registry::instance().remove(get_rel_path(sub_path));
+}
+
void http_server_base::update_config_handler(const http_request &req,
http_response &resp)
{
auto res = dsn::update_config(req);
diff --git a/src/http/http_server.h b/src/http/http_server.h
index a9cf668b2..f2c393085 100644
--- a/src/http/http_server.h
+++ b/src/http/http_server.h
@@ -103,6 +103,8 @@ public:
std::string parameters,
std::string help) const;
+ void deregister_handler(std::string sub_path) const;
+
private:
// If sub_path is 'app/duplication', the built path would be
'<root_path>/app/duplication',
// where path() would be called as root_path.
diff --git a/src/meta/app_env_validator.cpp b/src/meta/app_env_validator.cpp
index e068022a0..229389b3b 100644
--- a/src/meta/app_env_validator.cpp
+++ b/src/meta/app_env_validator.cpp
@@ -19,60 +19,54 @@
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
+#include <fmt/format.h>
+#include <nlohmann/json.hpp>
#include <stdint.h>
-#include <memory>
#include <set>
#include <utility>
#include <vector>
#include "common/replica_envs.h"
+#include "http/http_status_code.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"
#include "utils/strings.h"
+#include "utils/throttling_controller.h"
#include "utils/token_bucket_throttling_controller.h"
namespace dsn {
namespace replication {
-bool validate_app_envs(const std::map<std::string, std::string> &envs)
+app_env_validator::app_env_validator()
{
- // only check rocksdb app envs currently
+ register_all_validators();
+ register_handler(
+ "list",
+ std::bind(
+ &app_env_validator::list_all_envs, this, std::placeholders::_1,
std::placeholders::_2),
+ "List all available table environments.");
+}
+
+app_env_validator::~app_env_validator() { deregister_handler("list"); }
- for (const auto &it : envs) {
- if (replica_envs::ROCKSDB_STATIC_OPTIONS.find(it.first) ==
+bool app_env_validator::validate_app_envs(const std::map<std::string,
std::string> &envs)
+{
+ // only check rocksdb app envs currently
+ for (const auto & [ key, value ] : envs) {
+ if (replica_envs::ROCKSDB_STATIC_OPTIONS.find(key) ==
replica_envs::ROCKSDB_STATIC_OPTIONS.end() &&
- replica_envs::ROCKSDB_DYNAMIC_OPTIONS.find(it.first) ==
+ replica_envs::ROCKSDB_DYNAMIC_OPTIONS.find(key) ==
replica_envs::ROCKSDB_DYNAMIC_OPTIONS.end()) {
continue;
}
std::string hint_message;
- if (!validate_app_env(it.first, it.second, hint_message)) {
- LOG_WARNING(
- "app env {}={} is invaild, hint_message:{}", it.first,
it.second, hint_message);
+ if (!validate_app_env(key, value, hint_message)) {
+ LOG_WARNING("app env '{}={}' is invalid, hint_message: {}", key,
value, hint_message);
return false;
}
}
return true;
}
-bool validate_app_env(const std::string &env_name,
- const std::string &env_value,
- std::string &hint_message)
-{
- return app_env_validator::instance().validate_app_env(env_name, env_value,
hint_message);
-}
-
-bool check_slow_query(const std::string &env_value, std::string &hint_message)
-{
- uint64_t threshold = 0;
- if (!dsn::buf2uint64(env_value, threshold) ||
- threshold < replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS) {
- hint_message = fmt::format("Slow query threshold must be >= {}ms",
- replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS);
- return false;
- }
- return true;
-}
-
bool check_deny_client(const std::string &env_value, std::string &hint_message)
{
std::vector<std::string> sub_sargs;
@@ -93,184 +87,290 @@ bool check_deny_client(const std::string &env_value,
std::string &hint_message)
return true;
}
-bool check_rocksdb_iteration(const std::string &env_value, std::string
&hint_message)
+bool check_throttling(const std::string &env_value, std::string &hint_message)
{
- uint64_t threshold = 0;
- if (!dsn::buf2uint64(env_value, threshold) || threshold < 0) {
- hint_message = "Rocksdb iteration threshold must be greater than zero";
- return false;
- }
- return true;
+ uint64_t delay_units = 0;
+ uint64_t delay_ms = 0;
+ uint64_t reject_units = 0;
+ uint64_t reject_delay_ms = 0;
+ return utils::throttling_controller::parse_from_env(
+ env_value, delay_units, delay_ms, reject_units, reject_delay_ms,
hint_message);
}
-bool check_throttling(const std::string &env_value, std::string &hint_message)
+void app_env_validator::EnvInfo::init()
{
- std::vector<std::string> sargs;
- utils::split_args(env_value.c_str(), sargs, ',');
- if (sargs.empty()) {
- hint_message = "The value shouldn't be empty";
- return false;
- }
-
- // example for sarg: 100K*delay*100 / 100M*reject*100
- bool reject_parsed = false;
- bool delay_parsed = false;
- for (std::string &sarg : sargs) {
- std::vector<std::string> sub_sargs;
- utils::split_args(sarg.c_str(), sub_sargs, '*', true);
- if (sub_sargs.size() != 3) {
- hint_message = fmt::format("The field count of {} should be 3",
sarg);
- return false;
- }
-
- // check the first part, which is must be a positive number followed
with 'K' or 'M'
- int64_t units = 0;
- if (!sub_sargs[0].empty() &&
- ('M' == *sub_sargs[0].rbegin() || 'K' == *sub_sargs[0].rbegin())) {
- sub_sargs[0].pop_back();
- }
- if (!buf2int64(sub_sargs[0], units) || units < 0) {
- hint_message = fmt::format("{} should be non-negative int",
sub_sargs[0]);
- return false;
- }
-
- // check the second part, which is must be "delay" or "reject"
- if (sub_sargs[1] == "delay") {
- if (delay_parsed) {
- hint_message = "duplicate delay config";
- return false;
- }
- delay_parsed = true;
- } else if (sub_sargs[1] == "reject") {
- if (reject_parsed) {
- hint_message = "duplicate reject config";
- return false;
- }
- reject_parsed = true;
- } else {
- hint_message = fmt::format("{} should be \"delay\" or \"reject\"",
sub_sargs[1]);
- return false;
+ // Set default limitation description.
+ if (limit_desc.empty()) {
+ switch (type) {
+ case ValueType::kBool:
+ limit_desc = "true | false";
+ break;
+ case ValueType::kString:
+ break;
+ default:
+ CHECK_TRUE(false);
+ __builtin_unreachable();
}
+ }
- // check the third part, which is must be a positive number or 0
- int64_t delay_ms = 0;
- if (!buf2int64(sub_sargs[2], delay_ms) || delay_ms < 0) {
- hint_message = fmt::format("{} should be non-negative int",
sub_sargs[2]);
- return false;
+ // Set default sample.
+ if (sample.empty()) {
+ switch (type) {
+ case ValueType::kBool:
+ sample = "true";
+ break;
+ case ValueType::kString:
+ break;
+ default:
+ CHECK_TRUE(false);
+ __builtin_unreachable();
}
}
-
- return true;
}
-bool check_bool_value(const std::string &env_value, std::string &hint_message)
-{
- bool result = false;
- if (!dsn::buf2bool(env_value, result)) {
- hint_message = fmt::format("invalid string {}, should be \"true\" or
\"false\"", env_value);
- return false;
- }
- return true;
-}
+app_env_validator::EnvInfo::EnvInfo(ValueType t) : type(t) { init(); }
-bool check_rocksdb_write_buffer_size(const std::string &env_value, std::string
&hint_message)
+app_env_validator::EnvInfo::EnvInfo(ValueType t,
+ std::string ld,
+ std::string s,
+ string_validator_func v)
+ : type(t), limit_desc(std::move(ld)), sample(std::move(s)),
string_validator(std::move(v))
{
- uint64_t val = 0;
-
- if (!dsn::buf2uint64(env_value, val)) {
- hint_message = fmt::format("rocksdb.write_buffer_size cannot set this
val: {}", env_value);
- return false;
- }
- if (val < (16 << 20) || val > (512 << 20)) {
- hint_message =
- fmt::format("rocksdb.write_buffer_size suggest set val in range
[16777216, 536870912]");
- return false;
- }
- return true;
+ CHECK_TRUE(type == ValueType::kString);
+ init();
}
-bool check_rocksdb_num_levels(const std::string &env_value, std::string
&hint_message)
-{
- int32_t val = 0;
- if (!dsn::buf2int32(env_value, val)) {
- hint_message = fmt::format("rocksdb.num_levels cannot set this val:
{}", env_value);
- return false;
- }
- if (val < 1 || val > 10) {
- hint_message = fmt::format("rocksdb.num_levels suggest set val in
range [1 , 10]");
- return false;
- }
- return true;
+app_env_validator::EnvInfo::EnvInfo(ValueType t,
+ std::string ld,
+ std::string s,
+ int_validator_func v)
+ : type(t), limit_desc(std::move(ld)), sample(std::move(s)),
int_validator(std::move(v))
+{
+ CHECK_TRUE(type == ValueType::kInt64 || type == ValueType::kInt32);
+ init();
}
bool app_env_validator::validate_app_env(const std::string &env_name,
const std::string &env_value,
std::string &hint_message)
{
- auto func_iter = _validator_funcs.find(env_name);
- if (func_iter != _validator_funcs.end()) {
- // check function == nullptr means no check
- if (nullptr != func_iter->second && !func_iter->second(env_value,
hint_message)) {
- LOG_WARNING("{}={} is invalid.", env_name, env_value);
+ // Check if the env is supported.
+ const auto func_iter = _validator_funcs.find(env_name);
+ if (func_iter == _validator_funcs.end()) {
+ hint_message = fmt::format("app_env '{}' is not supported", env_name);
+ return false;
+ }
+
+ // 'int_result' will be used if the env variable is integer type.
+ int64_t int_result = 0;
+ switch (func_iter->second.type) {
+ case ValueType::kBool: {
+ // Check by the default boolean validator.
+ bool result = false;
+ if (!dsn::buf2bool(env_value, result)) {
+ hint_message = fmt::format("invalid value '{}', should be a
boolean", env_value);
+ return false;
+ }
+ break;
+ }
+ case ValueType::kInt32: {
+ // Check by the default int32 validator.
+ int32_t result = 0;
+ if (!dsn::buf2int32(env_value, result)) {
+ hint_message =
+ fmt::format("invalid value '{}', should be an 32 bits
integer", env_value);
+ return false;
+ }
+ int_result = result;
+ break;
+ }
+ case ValueType::kInt64: {
+ // Check by the default int64 validator.
+ int64_t result = 0;
+ if (!dsn::buf2int64(env_value, result)) {
+ hint_message =
+ fmt::format("invalid value '{}', should be an 64 bits
integer", env_value);
+ return false;
+ }
+ int_result = result;
+ break;
+ }
+ case ValueType::kString: {
+ // Check by the self defined validator.
+ if (nullptr != func_iter->second.string_validator &&
+ !func_iter->second.string_validator(env_value, hint_message)) {
return false;
}
+ break;
+ }
+ default:
+ CHECK_TRUE(false);
+ __builtin_unreachable();
+ }
- return true;
+ if (func_iter->second.type == ValueType::kInt32 ||
+ func_iter->second.type == ValueType::kInt64) {
+ // Check by the self defined validator.
+ if (nullptr != func_iter->second.int_validator &&
+ !func_iter->second.int_validator(int_result)) {
+ hint_message = fmt::format(
+ "invalid value '{}', should be '{}'", env_value,
func_iter->second.limit_desc);
+ return false;
+ }
}
- hint_message = fmt::format("app_env \"{}\" is not supported", env_name);
- return false;
+ return true;
}
void app_env_validator::register_all_validators()
{
+ static const auto kMinWriteBufferSize = 16 << 20;
+ static const auto kMaxWriteBufferSize = 512 << 20;
+ static const auto kMinLevel = 1;
+ static const auto kMaxLevel = 10;
+ static const std::string check_throttling_limit =
"<size[K|M]>*<delay|reject>*<milliseconds>";
+ static const std::string check_throttling_sample =
"10000*delay*100,20000*reject*100";
+
+ // TODO(yingchun): Use a macro to simplify the following 2 code blocks.
+ // EnvInfo for MANUAL_COMPACT_*_BOTTOMMOST_LEVEL_COMPACTION.
+ const std::set<std::string> valid_mcblcs(
+ {replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE,
+ replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP});
+ const std::string mcblc_sample(fmt::format("{}", fmt::join(valid_mcblcs, "
| ")));
+ const app_env_validator::EnvInfo mcblc(
+ app_env_validator::ValueType::kString,
+ mcblc_sample,
+ replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP,
+ [=](const std::string &new_value, std::string &hint_message) {
+ if (valid_mcblcs.count(new_value) == 0) {
+ hint_message = mcblc_sample;
+ return false;
+ }
+ return true;
+ });
+
+ // EnvInfo for ROCKSDB_USAGE_SCENARIO.
+ const std::set<std::string>
valid_russ({replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
+
replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE,
+
replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD});
+ const std::string rus_sample(fmt::format("{}", fmt::join(valid_russ, " |
")));
+ const app_env_validator::EnvInfo rus(
+ app_env_validator::ValueType::kString,
+ rus_sample,
+ replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
+ [=](const std::string &new_value, std::string &hint_message) {
+ if (valid_russ.count(new_value) == 0) {
+ hint_message = rus_sample;
+ return false;
+ }
+ return true;
+ });
+
_validator_funcs = {
{replica_envs::SLOW_QUERY_THRESHOLD,
- std::bind(&check_slow_query, std::placeholders::_1,
std::placeholders::_2)},
+ {ValueType::kInt64,
+ fmt::format(">= {}", replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS),
+ "1000",
+ [](int64_t new_value) {
+ return replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS <= new_value;
+ }}},
{replica_envs::WRITE_QPS_THROTTLING,
- std::bind(&check_throttling, std::placeholders::_1,
std::placeholders::_2)},
+ {ValueType::kString, check_throttling_limit, check_throttling_sample,
&check_throttling}},
{replica_envs::WRITE_SIZE_THROTTLING,
- std::bind(&check_throttling, std::placeholders::_1,
std::placeholders::_2)},
+ {ValueType::kString, check_throttling_limit, check_throttling_sample,
&check_throttling}},
{replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS,
- std::bind(&check_rocksdb_iteration, std::placeholders::_1,
std::placeholders::_2)},
- {replica_envs::ROCKSDB_BLOCK_CACHE_ENABLED,
- std::bind(&check_bool_value, std::placeholders::_1,
std::placeholders::_2)},
+ {ValueType::kInt64, ">= 0", "1000", [](int64_t new_value) { return
new_value >= 0; }}},
+ {replica_envs::ROCKSDB_BLOCK_CACHE_ENABLED, {ValueType::kBool}},
{replica_envs::READ_QPS_THROTTLING,
- std::bind(&check_throttling, std::placeholders::_1,
std::placeholders::_2)},
+ {ValueType::kString, check_throttling_limit, check_throttling_sample,
&check_throttling}},
{replica_envs::READ_SIZE_THROTTLING,
- std::bind(&utils::token_bucket_throttling_controller::validate,
- std::placeholders::_1,
- std::placeholders::_2)},
- {replica_envs::SPLIT_VALIDATE_PARTITION_HASH,
- std::bind(&check_bool_value, std::placeholders::_1,
std::placeholders::_2)},
- {replica_envs::USER_SPECIFIED_COMPACTION, nullptr},
+ {ValueType::kString,
+ "",
+ "20000*delay*100,20000*reject*100",
+ &utils::token_bucket_throttling_controller::validate}},
+ {replica_envs::SPLIT_VALIDATE_PARTITION_HASH, {ValueType::kBool}},
+ {replica_envs::USER_SPECIFIED_COMPACTION, {ValueType::kString}},
{replica_envs::BACKUP_REQUEST_QPS_THROTTLING,
- std::bind(&check_throttling, std::placeholders::_1,
std::placeholders::_2)},
- {replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND,
- std::bind(&check_bool_value, std::placeholders::_1,
std::placeholders::_2)},
+ {ValueType::kString, check_throttling_limit, check_throttling_sample,
&check_throttling}},
+ {replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND, {ValueType::kBool}},
{replica_envs::DENY_CLIENT_REQUEST,
- std::bind(&check_deny_client, std::placeholders::_1,
std::placeholders::_2)},
+ {ValueType::kString,
+ "timeout*all | timeout*write | timeout*read | reconfig*all |
reconfig*write | "
+ "reconfig*read",
+ "timeout*all",
+ &check_deny_client}},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
- std::bind(&check_rocksdb_write_buffer_size, std::placeholders::_1,
std::placeholders::_2)},
+ {ValueType::kInt64,
+ fmt::format("In range [{}, {}]", kMinWriteBufferSize,
kMaxWriteBufferSize),
+ fmt::format("{}", kMinWriteBufferSize),
+ [](int64_t new_value) {
+ return kMinWriteBufferSize <= new_value && new_value <=
kMaxWriteBufferSize;
+ }}},
{replica_envs::ROCKSDB_NUM_LEVELS,
- std::bind(&check_rocksdb_num_levels, std::placeholders::_1,
std::placeholders::_2)},
- // TODO(zhaoliwei): not implemented
- {replica_envs::BUSINESS_INFO, nullptr},
- {replica_envs::TABLE_LEVEL_DEFAULT_TTL, nullptr},
- {replica_envs::ROCKSDB_USAGE_SCENARIO, nullptr},
- {replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT, nullptr},
- {replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS, nullptr},
- {replica_envs::MANUAL_COMPACT_DISABLED, nullptr},
- {replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT, nullptr},
- {replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME, nullptr},
- {replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL, nullptr},
- {replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION,
nullptr},
- {replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME, nullptr},
- {replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, nullptr},
- {replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
nullptr},
- {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr},
- {replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr},
- };
+ {ValueType::kInt64,
+ fmt::format("In range [{}, {}]", kMinLevel, kMaxLevel),
+ "6",
+ [](int64_t new_value) { return kMinLevel <= new_value && new_value
<= kMaxLevel; }}},
+ {replica_envs::BUSINESS_INFO, {ValueType::kString}},
+ {replica_envs::TABLE_LEVEL_DEFAULT_TTL,
+ {ValueType::kInt32, ">= 0", "86400", [](int64_t new_value) { return
new_value >= 0; }}},
+ {replica_envs::ROCKSDB_USAGE_SCENARIO, rus},
+ {replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT,
+ {ValueType::kInt32, "> 0", "2", [](int64_t new_value) { return
new_value > 0; }}},
+ {replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS,
+ {ValueType::kInt32, ">= 0", "3600", [](int64_t new_value) { return
new_value >= 0; }}},
+ {replica_envs::MANUAL_COMPACT_DISABLED, {ValueType::kBool}},
+ {replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT,
+ {ValueType::kInt32, ">= 0", "8", [](int64_t new_value) { return
new_value >= 0; }}},
+ {replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+ {ValueType::kInt64,
+ "> 0, timestamp (in seconds) to trigger the once manual compaction",
+ "1700000000",
+ [](int64_t new_value) { return new_value >= 0; }}},
+ {replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL,
+ {ValueType::kInt64,
+ "-1 or >= 1",
+ "6",
+ [](int64_t new_value) { return new_value == -1 || new_value >= 1;
}}},
+ {replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION, mcblc},
+ // TODO(yingchun): enable the validator by refactoring
+ // pegasus_manual_compact_service::check_periodic_compact
+ {replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
{ValueType::kString}},
+ {replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL,
+ {ValueType::kInt64,
+ "-1 or >= 1",
+ "6",
+ [](int64_t new_value) { return new_value == -1 || new_value >= 1;
}}},
+ {replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
mcblc},
+ {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS,
{ValueType::kString}},
+ {replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES,
{ValueType::kString}}};
+}
+
+const std::unordered_map<app_env_validator::ValueType, std::string>
+ app_env_validator::EnvInfo::ValueType2String{{ValueType::kBool, "bool"},
+ {ValueType::kInt32, "unsigned
int32"},
+ {ValueType::kInt64, "unsigned
int64"},
+ {ValueType::kString,
"string"}};
+
+nlohmann::json app_env_validator::EnvInfo::to_json() const
+{
+ const auto &type_str = ValueType2String.find(type);
+ CHECK_TRUE(type_str != ValueType2String.end());
+ nlohmann::json info;
+ info["type"] = type_str->second;
+ info["limitation"] = limit_desc;
+ info["sample"] = sample;
+ return info;
+}
+
+void app_env_validator::list_all_envs(const http_request &req, http_response
&resp) const
+{
+ nlohmann::json envs;
+ for (const auto &validator_func : _validator_funcs) {
+ envs[validator_func.first] = validator_func.second.to_json();
+ }
+ resp.body = envs.dump(2);
+ resp.status_code = http_status_code::kOk;
}
} // namespace replication
diff --git a/src/meta/app_env_validator.h b/src/meta/app_env_validator.h
index d963df051..7a0ca1175 100644
--- a/src/meta/app_env_validator.h
+++ b/src/meta/app_env_validator.h
@@ -17,37 +17,86 @@
#pragma once
+#include <nlohmann/json.hpp> // IWYU pragma: keep
+#include <nlohmann/json_fwd.hpp>
+#include <stdint.h>
#include <functional>
#include <map>
#include <string>
-#include "utils/singleton.h"
+#include <unordered_map>
+
+#include "http/http_server.h"
namespace dsn {
namespace replication {
-bool validate_app_envs(const std::map<std::string, std::string> &envs);
-
-bool validate_app_env(const std::string &env_name,
- const std::string &env_value,
- std::string &hint_message);
-
-class app_env_validator : public utils::singleton<app_env_validator>
+class app_env_validator final : public http_service
{
public:
+ app_env_validator();
+
+ ~app_env_validator() final;
+
+ std::string path() const override { return "envs"; }
+
bool validate_app_env(const std::string &env_name,
const std::string &env_value,
std::string &hint_message);
-private:
- app_env_validator() { register_all_validators(); }
- ~app_env_validator() = default;
+ bool validate_app_envs(const std::map<std::string, std::string> &envs);
+private:
void register_all_validators();
- using validator_func = std::function<bool(const std::string &, std::string
&)>;
- std::map<std::string, validator_func> _validator_funcs;
+ void list_all_envs(const http_request &req, http_response &resp) const;
+
+public:
+ // The type of table env.
+ enum class ValueType : uint32_t
+ {
+ kBool,
+ kInt32,
+ kInt64,
+ kString
+ };
+
+ // The type of table env and its limit description.
+ struct EnvInfo
+ {
+ using string_validator_func = std::function<bool(const std::string &,
std::string &)>;
+ using int_validator_func = std::function<bool(int64_t)>;
+
+ ValueType type;
+ std::string limit_desc;
+ std::string sample;
+ string_validator_func string_validator;
+ int_validator_func int_validator;
+
+ // Construct an object.
+ EnvInfo(ValueType t);
+
+ // Construct an object with kString type.
+ EnvInfo(ValueType t,
+ std::string ld,
+ std::string s,
+ string_validator_func v = string_validator_func());
+
+ // Construct an object with kInt64 type.
+ EnvInfo(ValueType t,
+ std::string ld,
+ std::string s,
+ int_validator_func v = int_validator_func());
+
+ nlohmann::json to_json() const;
+
+ static const std::unordered_map<app_env_validator::ValueType,
std::string> ValueType2String;
+
+ private:
+ void init();
+ };
- friend class utils::singleton<app_env_validator>;
+ // The table envs and their limit descriptions, all available envs must be
registered here.
+ std::map<std::string, EnvInfo> _validator_funcs;
};
} // namespace replication
diff --git a/src/meta/meta_http_service.cpp b/src/meta/meta_http_service.cpp
index 0a9e1bbf9..876409644 100644
--- a/src/meta/meta_http_service.cpp
+++ b/src/meta/meta_http_service.cpp
@@ -761,8 +761,11 @@ void meta_http_service::start_compaction_handler(const
http_request &req, http_r
resp.status_code = http_status_code::kBadRequest;
return;
}
- if (info.bottommost_level_compaction.empty() ||
(info.bottommost_level_compaction != "skip" &&
-
info.bottommost_level_compaction != "force")) {
+ if (info.bottommost_level_compaction.empty() ||
+ (info.bottommost_level_compaction !=
+ replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP &&
+ info.bottommost_level_compaction !=
+ replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE)) {
resp.body = "bottommost_level_compaction should ony be 'skip' or
'force'";
resp.status_code = http_status_code::kBadRequest;
return;
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index f563b34f2..fd2701a9c 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -41,7 +41,6 @@
// IWYU pragma: no_include <type_traits>
#include <unordered_map>
-#include "app_env_validator.h"
#include "common/duplication_common.h"
#include "common/json_helper.h"
#include "common/replica_envs.h"
@@ -50,6 +49,7 @@
#include "common/replication_enums.h"
#include "common/replication_other_types.h"
#include "dump_file.h"
+#include "meta/app_env_validator.h"
#include "meta/meta_data.h"
#include "meta/meta_service.h"
#include "meta/meta_state_service.h"
@@ -1121,7 +1121,7 @@ void server_state::create_app(dsn::message_ex *msg)
!validate_target_max_replica_count(request.options.replica_count)) {
response.err = ERR_INVALID_PARAMETERS;
will_create_app = false;
- } else if (!validate_app_envs(request.options.envs)) {
+ } else if (!_app_env_validator.validate_app_envs(request.options.envs)) {
response.err = ERR_INVALID_PARAMETERS;
will_create_app = false;
} else {
@@ -2802,10 +2802,16 @@ void server_state::set_app_envs(const app_env_rpc
&env_rpc)
std::ostringstream os;
for (int i = 0; i < keys.size(); i++) {
- if (i != 0)
+ if (i != 0) {
os << ", ";
+ }
- if (!validate_app_env(keys[i], values[i],
env_rpc.response().hint_message)) {
+ if (!_app_env_validator.validate_app_env(
+ keys[i], values[i], env_rpc.response().hint_message)) {
+ LOG_WARNING("app env '{}={}' is invalid, hint_message: {}",
+ keys[i],
+ values[i],
+ env_rpc.response().hint_message);
env_rpc.response().err = ERR_INVALID_PARAMETERS;
return;
}
@@ -3143,9 +3149,9 @@ bool
server_state::parse_compaction_envs(start_manual_compact_rpc rpc,
}
}
- std::string bottommost = "skip";
+ std::string bottommost =
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;
if (request.__isset.bottommost && request.bottommost) {
- bottommost = "force";
+ bottommost =
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
}
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION);
values.emplace_back(bottommost);
diff --git a/src/meta/server_state.h b/src/meta/server_state.h
index aad53ec5c..ad3664cf1 100644
--- a/src/meta/server_state.h
+++ b/src/meta/server_state.h
@@ -38,6 +38,7 @@
#include <utility>
#include <vector>
+#include "app_env_validator.h"
#include "common/gpid.h"
#include "common/manual_compact.h"
#include "dsn.layer2_types.h"
@@ -429,6 +430,8 @@ private:
int32_t _add_secondary_max_count_for_one_node;
std::vector<std::unique_ptr<command_deregister>> _cmds;
+ app_env_validator _app_env_validator;
+
table_metric_entities _table_metric_entities;
};
diff --git a/src/meta/test/meta_app_envs_test.cpp
b/src/meta/test/meta_app_envs_test.cpp
index f3422f7c7..58bdf83ef 100644
--- a/src/meta/test/meta_app_envs_test.cpp
+++ b/src/meta/test/meta_app_envs_test.cpp
@@ -28,6 +28,7 @@
#include <memory>
#include <set>
#include <string>
+#include <utility>
#include "common/replica_envs.h"
#include "gtest/gtest.h"
@@ -70,12 +71,12 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
{replica_envs::WRITE_QPS_THROTTLING,
"20A*delay*100",
ERR_INVALID_PARAMETERS,
- "20A should be non-negative int",
+ "'20A' should be an unsigned integer",
"20M*delay*100"},
{replica_envs::WRITE_QPS_THROTTLING,
"-20*delay*100",
ERR_INVALID_PARAMETERS,
- "-20 should be non-negative int",
+ "'-20' should be an unsigned integer",
"20M*delay*100"},
{replica_envs::WRITE_QPS_THROTTLING,
"",
@@ -85,37 +86,37 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
{replica_envs::WRITE_QPS_THROTTLING,
"20A*delay",
ERR_INVALID_PARAMETERS,
- "The field count of 20A*delay should be 3",
+ "The field count of '20A*delay' separated by '*' must be 3",
"20M*delay*100"},
{replica_envs::WRITE_QPS_THROTTLING,
"20K*pass*100",
ERR_INVALID_PARAMETERS,
- "pass should be \"delay\" or \"reject\"",
+ "'pass' should be 'delay' or 'reject'",
"20M*delay*100"},
{replica_envs::WRITE_QPS_THROTTLING,
"20K*delay*-100",
ERR_INVALID_PARAMETERS,
- "-100 should be non-negative int",
+ "'-100' should be an unsigned integer",
"20M*delay*100"},
{replica_envs::WRITE_QPS_THROTTLING,
"2K**delay*100",
ERR_INVALID_PARAMETERS,
- "The field count of 2K**delay*100 should be 3",
+ "The field count of '2K**delay*100' separated by '*' must be 3",
"20M*delay*100"},
{replica_envs::WRITE_QPS_THROTTLING,
"2K*delay**100",
ERR_INVALID_PARAMETERS,
- "The field count of 2K*delay**100 should be 3",
+ "The field count of '2K*delay**100' separated by '*' must be 3",
"20M*delay*100"},
{replica_envs::WRITE_QPS_THROTTLING,
"2K*delay*100,3K*delay*100",
ERR_INVALID_PARAMETERS,
- "duplicate delay config",
+ "duplicate 'delay' config",
"20M*delay*100"},
{replica_envs::WRITE_QPS_THROTTLING,
"2K*reject*100,3K*reject*100",
ERR_INVALID_PARAMETERS,
- "duplicate reject config",
+ "duplicate 'reject' config",
"20M*delay*100"},
{replica_envs::WRITE_QPS_THROTTLING, "20M*reject*100", ERR_OK, "",
"20M*reject*100"},
{replica_envs::WRITE_SIZE_THROTTLING, "300*delay*100", ERR_OK, "",
"300*delay*100"},
@@ -124,18 +125,18 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
{replica_envs::SLOW_QUERY_THRESHOLD,
"19",
ERR_INVALID_PARAMETERS,
- "Slow query threshold must be >= 20ms",
+ "invalid value '19', should be '>= 20'",
"20"},
{replica_envs::SLOW_QUERY_THRESHOLD,
"0",
ERR_INVALID_PARAMETERS,
- "Slow query threshold must be >= 20ms",
+ "invalid value '0', should be '>= 20'",
"20"},
{replica_envs::TABLE_LEVEL_DEFAULT_TTL, "10", ERR_OK, "", "10"},
- {replica_envs::ROCKSDB_USAGE_SCENARIO, "20", ERR_OK, "", "20"},
+ {replica_envs::ROCKSDB_USAGE_SCENARIO, "bulk_load", ERR_OK, "",
"bulk_load"},
{replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT, "30", ERR_OK, "",
"30"},
{replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS, "40", ERR_OK,
"", "40"},
- {replica_envs::MANUAL_COMPACT_DISABLED, "50", ERR_OK, "", "50"},
+ {replica_envs::MANUAL_COMPACT_DISABLED, "true", ERR_OK, "", "true"},
{replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT, "60",
ERR_OK, "", "60"},
{replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME, "70", ERR_OK, "",
"70"},
{replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL, "80", ERR_OK, "",
"80"},
@@ -144,19 +145,19 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
"100",
ERR_INVALID_PARAMETERS,
- "rocksdb.write_buffer_size suggest set val in range [16777216,
536870912]",
+ "invalid value '100', should be 'In range [16777216, 536870912]'",
"67108864"},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
"636870912",
ERR_INVALID_PARAMETERS,
- "rocksdb.write_buffer_size suggest set val in range [16777216,
536870912]",
+ "invalid value '636870912', should be 'In range [16777216,
536870912]'",
"536870912"},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE, "67108864", ERR_OK, "",
"67108864"},
{replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
- "200",
+ replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP,
ERR_OK,
"",
- "200"},
+ replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
{replica_envs::BUSINESS_INFO, "300", ERR_OK, "", "300"},
{replica_envs::DENY_CLIENT_REQUEST,
"400",
@@ -188,7 +189,7 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
{"not_exist_env",
"500",
ERR_INVALID_PARAMETERS,
- "app_env \"not_exist_env\" is not supported",
+ "app_env 'not_exist_env' is not supported",
""}};
auto app = find_app(app_name);
@@ -196,10 +197,11 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
configuration_update_app_env_response response =
update_app_envs(app_name, {test.env_key}, {test.env_value});
- ASSERT_EQ(response.err, test.err);
- ASSERT_EQ(response.hint_message, test.hint);
- if (app->envs.find(test.env_key) != app->envs.end()) {
- ASSERT_EQ(app->envs.at(test.env_key), test.expect_value);
+ ASSERT_EQ(test.err, response.err) << test.env_key << " : " <<
test.env_value;
+ ASSERT_EQ(test.hint, response.hint_message);
+ const auto it = app->envs.find(test.env_key);
+ if (it != app->envs.end()) {
+ ASSERT_EQ(test.expect_value, it->second);
}
}
diff --git a/src/meta/test/meta_http_service_test.cpp
b/src/meta/test/meta_http_service_test.cpp
index ab0f3cf13..cadcdffc6 100644
--- a/src/meta/test/meta_http_service_test.cpp
+++ b/src/meta/test/meta_http_service_test.cpp
@@ -95,9 +95,9 @@ public:
// env (value_version, 1) was set by create_app
std::string fake_json = R"({")" + env_key + R"(":)" + R"(")" +
env_value + R"(",)" +
R"("value_version":"1"})" + "\n";
- ASSERT_EQ(fake_resp.status_code, http_status_code::kOk)
+ ASSERT_EQ(http_status_code::kOk, fake_resp.status_code)
<< get_http_status_message(fake_resp.status_code);
- ASSERT_EQ(fake_resp.body, fake_json);
+ ASSERT_EQ(fake_json, fake_resp.body);
}
std::unique_ptr<meta_http_service> _mhs;
@@ -381,12 +381,12 @@ TEST_F(meta_bulk_load_http_test, start_compaction_test)
for (const auto &test : tests) {
http_response resp = test_start_compaction(test.request_json);
- ASSERT_EQ(resp.status_code, test.expected_code);
+ ASSERT_EQ(test.expected_code, resp.status_code);
std::string expected_json = test.expected_response_json;
if (test.expected_code == http_status_code::kOk) {
expected_json += "\n";
}
- ASSERT_EQ(resp.body, expected_json);
+ ASSERT_EQ(expected_json, resp.body);
}
}
diff --git a/src/meta/test/meta_mauanl_compaction_test.cpp
b/src/meta/test/meta_mauanl_compaction_test.cpp
index 3a0184ff7..6853d34e4 100644
--- a/src/meta/test/meta_mauanl_compaction_test.cpp
+++ b/src/meta/test/meta_mauanl_compaction_test.cpp
@@ -165,14 +165,62 @@ TEST_F(meta_app_compaction_test, test_start_compaction)
int32_t running_count;
error_code expected_err;
std::string expected_bottommost;
- } tests[] = {{"app_not_exist", "false", false, -1, 0, ERR_APP_NOT_EXIST,
"skip"},
- {APP_NAME, "true", false, -1, 0, ERR_OPERATION_DISABLED,
"skip"},
- {APP_NAME, "false", false, -5, 0, ERR_INVALID_PARAMETERS,
"skip"},
- {APP_NAME, "false", false, -1, -1, ERR_INVALID_PARAMETERS,
"skip"},
- {APP_NAME, "false", false, -1, 0, ERR_OK, "skip"},
- {APP_NAME, "false", true, -1, 0, ERR_OK, "force"},
- {APP_NAME, "false", false, 1, 0, ERR_OK, "skip"},
- {APP_NAME, "false", true, -1, 1, ERR_OK, "force"}};
+ } tests[] = {{"app_not_exist",
+ "false",
+ false,
+ -1,
+ 0,
+ ERR_APP_NOT_EXIST,
+
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+ {APP_NAME,
+ "true",
+ false,
+ -1,
+ 0,
+ ERR_OPERATION_DISABLED,
+
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+ {APP_NAME,
+ "false",
+ false,
+ -5,
+ 0,
+ ERR_INVALID_PARAMETERS,
+
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+ {APP_NAME,
+ "false",
+ false,
+ -1,
+ -1,
+ ERR_INVALID_PARAMETERS,
+
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+ {APP_NAME,
+ "false",
+ false,
+ -1,
+ 0,
+ ERR_OK,
+
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+ {APP_NAME,
+ "false",
+ true,
+ -1,
+ 0,
+ ERR_OK,
+
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE},
+ {APP_NAME,
+ "false",
+ false,
+ 1,
+ 0,
+ ERR_OK,
+
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP},
+ {APP_NAME,
+ "false",
+ true,
+ -1,
+ 1,
+ ERR_OK,
+
replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE}};
for (const auto &test : tests) {
auto err = start_manual_compaction(test.app_name,
diff --git a/src/meta/test/meta_test_base.cpp b/src/meta/test/meta_test_base.cpp
index 4fb188370..965939d56 100644
--- a/src/meta/test/meta_test_base.cpp
+++ b/src/meta/test/meta_test_base.cpp
@@ -199,7 +199,7 @@ void meta_test_base::create_app(const std::string &name,
uint32_t partition_coun
auto result = fake_create_app(_ss.get(), req);
fake_wait_rpc(result, resp);
- ASSERT_EQ(resp.err, ERR_OK) << resp.err << " " << name;
+ ASSERT_EQ(ERR_OK, resp.err) << name;
// wait for the table to create
ASSERT_TRUE(_ss->spin_wait_staging(30));
diff --git a/src/meta/test/server_state_test.cpp
b/src/meta/test/server_state_test.cpp
index 0c6295b95..7fdee4588 100644
--- a/src/meta/test/server_state_test.cpp
+++ b/src/meta/test/server_state_test.cpp
@@ -32,6 +32,7 @@
#include <utility>
#include <vector>
+#include "common/replica_envs.h"
#include "common/replication.codes.h"
#include "dsn.layer2_types.h"
#include "gtest/gtest.h"
@@ -52,24 +53,35 @@ DSN_DECLARE_string(meta_state_service_type);
namespace dsn {
namespace replication {
-static const std::vector<std::string> keys =
{"manual_compact.once.trigger_time",
-
"manual_compact.once.target_level",
-
"manual_compact.once.bottommost_level_compaction",
-
"manual_compact.periodic.trigger_time",
-
"manual_compact.periodic.target_level",
-
"manual_compact.periodic.bottommost_level_compaction",
- "rocksdb.usage_scenario",
-
"rocksdb.checkpoint.reserve_min_count",
-
"rocksdb.checkpoint.reserve_time_seconds"};
+static const std::vector<std::string> keys = {
+ dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+ dsn::replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL,
+ dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION,
+ dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
+ dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL,
+ dsn::replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
+ dsn::replica_envs::ROCKSDB_USAGE_SCENARIO,
+ dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT,
+ dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS};
static const std::vector<std::string> values = {
- "p1v1", "p1v2", "p1v3", "p2v1", "p2v2", "p2v3", "p3v1", "p3v2", "p3v3"};
-
-static const std::vector<std::string> del_keys =
{"manual_compact.once.trigger_time",
-
"manual_compact.periodic.trigger_time",
- "rocksdb.usage_scenario"};
-static const std::set<std::string> del_keys_set =
{"manual_compact.once.trigger_time",
-
"manual_compact.periodic.trigger_time",
- "rocksdb.usage_scenario"};
+ "1712846598",
+ "6",
+ dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE,
+ "1712846598",
+ "-1",
+ dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP,
+ dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
+ "1",
+ "0"};
+
+static const std::vector<std::string> del_keys = {
+ dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+ dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
+ dsn::replica_envs::ROCKSDB_USAGE_SCENARIO};
+static const std::set<std::string> del_keys_set = {
+ dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME,
+ dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME,
+ dsn::replica_envs::ROCKSDB_USAGE_SCENARIO};
static const std::string clear_prefix = "rocksdb";
@@ -112,8 +124,7 @@ void meta_service_test_app::app_envs_basic_test()
ss->initialize(svc, apps_root);
ss->_all_apps.emplace(std::make_pair(fake_app->app_id, fake_app));
- dsn::error_code ec = ss->sync_apps_to_remote_storage();
- ASSERT_EQ(ec, dsn::ERR_OK);
+ ASSERT_EQ(dsn::ERR_OK, ss->sync_apps_to_remote_storage());
std::cout << "test server_state::set_app_envs()..." << std::endl;
{
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 3d8b64119..cf95eafeb 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -499,7 +499,7 @@ private:
void update_throttle_envs(const std::map<std::string, std::string> &envs);
void update_throttle_env_internal(const std::map<std::string, std::string>
&envs,
const std::string &key,
- throttling_controller &cntl);
+ utils::throttling_controller &cntl);
// update allowed users for access controller
void update_ac_allowed_users(const std::map<std::string, std::string>
&envs);
@@ -620,10 +620,12 @@ private:
bool _inactive_is_transient; // upgrade to P/S is allowed only iff true
bool _is_initializing; // when initializing, switching to primary
need to update ballot
deny_client _deny_client; // if deny requests
- throttling_controller _write_qps_throttling_controller; // throttling by
requests-per-second
- throttling_controller _write_size_throttling_controller; // throttling by
bytes-per-second
- throttling_controller _read_qps_throttling_controller;
- throttling_controller _backup_request_qps_throttling_controller;
+ utils::throttling_controller
+ _write_qps_throttling_controller; // throttling by requests-per-second
+ utils::throttling_controller
+ _write_size_throttling_controller; // throttling by bytes-per-second
+ utils::throttling_controller _read_qps_throttling_controller;
+ utils::throttling_controller _backup_request_qps_throttling_controller;
// duplication
std::shared_ptr<replica_duplicator_manager> _duplication_mgr;
diff --git a/src/replica/replica_throttle.cpp b/src/replica/replica_throttle.cpp
index ed6bd1cce..1d5ee36ef 100644
--- a/src/replica/replica_throttle.cpp
+++ b/src/replica/replica_throttle.cpp
@@ -42,8 +42,8 @@ namespace replication {
int64_t delay_ms = 0;
\
auto type =
_##op_type##_##throttling_type##_throttling_controller.control( \
request->header->client.timeout_ms, request_units, delay_ms);
\
- if (type != throttling_controller::PASS) {
\
- if (type == throttling_controller::DELAY) {
\
+ if (type != utils::throttling_controller::PASS) {
\
+ if (type == utils::throttling_controller::DELAY) {
\
tasking::enqueue(
\
LPC_##op_type##_THROTTLING_DELAY,
\
&_tracker,
\
@@ -51,7 +51,7 @@ namespace replication {
get_gpid().thread_hash(),
\
std::chrono::milliseconds(delay_ms));
\
METRIC_VAR_INCREMENT(throttling_delayed_##op_type##_requests);
\
- } else { /** type == throttling_controller::REJECT **/
\
+ } else { /** type == utils::throttling_controller::REJECT **/
\
if (delay_ms > 0) {
\
tasking::enqueue(LPC_##op_type##_THROTTLING_DELAY,
\
&_tracker,
\
@@ -87,15 +87,15 @@ bool replica::throttle_backup_request(message_ex *request)
int64_t delay_ms = 0;
auto type = _backup_request_qps_throttling_controller.control(
request->header->client.timeout_ms, 1, delay_ms);
- if (type != throttling_controller::PASS) {
- if (type == throttling_controller::DELAY) {
+ if (type != utils::throttling_controller::PASS) {
+ if (type == utils::throttling_controller::DELAY) {
tasking::enqueue(LPC_read_THROTTLING_DELAY,
&_tracker,
[ this, req = message_ptr(request) ]() {
on_client_read(req, true); },
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
METRIC_VAR_INCREMENT(throttling_delayed_backup_requests);
- } else { /** type == throttling_controller::REJECT **/
+ } else { /** type == utils::throttling_controller::REJECT **/
METRIC_VAR_INCREMENT(throttling_rejected_backup_requests);
}
return true;
@@ -118,7 +118,7 @@ void replica::update_throttle_envs(const
std::map<std::string, std::string> &env
void replica::update_throttle_env_internal(const std::map<std::string,
std::string> &envs,
const std::string &key,
- throttling_controller &cntl)
+ utils::throttling_controller &cntl)
{
bool throttling_changed = false;
std::string old_throttling;
diff --git a/src/replica/test/throttling_controller_test.cpp
b/src/replica/test/throttling_controller_test.cpp
index 1a17df866..120ad0b4f 100644
--- a/src/replica/test/throttling_controller_test.cpp
+++ b/src/replica/test/throttling_controller_test.cpp
@@ -20,14 +20,14 @@
#include "gtest/gtest.h"
namespace dsn {
-namespace replication {
+namespace utils {
class throttling_controller_test : public ::testing::Test
{
public:
void test_parse_env_basic()
{
- throttling_controller cntl;
+ utils::throttling_controller cntl;
std::string parse_err;
bool env_changed = false;
std::string old_value;
@@ -77,7 +77,7 @@ public:
void test_parse_env_multiplier()
{
- throttling_controller cntl;
+ utils::throttling_controller cntl;
std::string parse_err;
bool env_changed = false;
std::string old_value;
@@ -85,18 +85,16 @@ public:
struct test_case_1
{
std::string env;
-
int64_t delay_units;
int64_t delay_ms;
int64_t reject_units;
int64_t reject_ms;
} test_cases_1[] = {
- {"20K*delay*100", 5000 + 1, 100, 0, 0},
- {"20M*delay*100", 5000 * 1000 + 1, 100, 0, 0},
- {"20M*delay*100,20M*reject*100", 5000 * 1000 + 1, 100, 5000 * 1000
+ 1, 100},
-
+ {"20K*delay*100", (5 << 10) + 1, 100, 0, 0},
+ {"20M*delay*100", (5 << 20) + 1, 100, 0, 0},
+ {"20M*delay*100,20M*reject*100", (5 << 20) + 1, 100, (5 << 20) +
1, 100},
// throttling size exceeds int32_t max value
- {"80000M*delay*100", int64_t(20) * 1000 * 1000 * 1000 + 1, 100, 0,
0},
+ {"80000M*delay*100", (20000ULL << 20) + 1, 100, 0, 0},
};
for (const auto &tc : test_cases_1) {
ASSERT_TRUE(cntl.parse_from_env(tc.env, 4, parse_err, env_changed,
old_value));
@@ -127,5 +125,5 @@ TEST_F(throttling_controller_test, parse_env_basic) {
test_parse_env_basic(); }
TEST_F(throttling_controller_test, parse_env_multiplier) {
test_parse_env_multiplier(); }
-} // namespace replication
+} // namespace utils
} // namespace dsn
diff --git a/src/server/pegasus_manual_compact_service.cpp
b/src/server/pegasus_manual_compact_service.cpp
index 617a7b452..f4d143690 100644
--- a/src/server/pegasus_manual_compact_service.cpp
+++ b/src/server/pegasus_manual_compact_service.cpp
@@ -61,11 +61,6 @@ namespace server {
DEFINE_TASK_CODE(LPC_MANUAL_COMPACT, TASK_PRIORITY_COMMON, THREAD_POOL_COMPACT)
-const std::string
-
pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE("force");
-const std::string
-
pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP("skip");
-
pegasus_manual_compact_service::pegasus_manual_compact_service(pegasus_server_impl
*app)
: replica_base(*app),
_app(app),
@@ -260,9 +255,9 @@ void
pegasus_manual_compact_service::extract_manual_compact_opts(
find = envs.find(key_prefix +
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION);
if (find != envs.end()) {
const std::string &argv = find->second;
- if (argv == MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE) {
+ if (argv ==
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE) {
options.bottommost_level_compaction =
rocksdb::BottommostLevelCompaction::kForce;
- } else if (argv == MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP) {
+ } else if (argv ==
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP) {
options.bottommost_level_compaction =
rocksdb::BottommostLevelCompaction::kSkip;
} else {
LOG_WARNING_PREFIX(
@@ -270,7 +265,7 @@ void
pegasus_manual_compact_service::extract_manual_compact_opts(
find->first,
find->second,
// NOTICE associate with options.bottommost_level_compaction's
default value above
- MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP);
+
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP);
}
}
}
diff --git a/src/server/pegasus_manual_compact_service.h
b/src/server/pegasus_manual_compact_service.h
index 329b10b7b..0f3a8163f 100644
--- a/src/server/pegasus_manual_compact_service.h
+++ b/src/server/pegasus_manual_compact_service.h
@@ -87,9 +87,6 @@ private:
private:
FRIEND_TEST(manual_compact_service_test, extract_manual_compact_opts);
- static const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
- static const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;
-
pegasus_server_impl *_app;
#ifdef PEGASUS_UNIT_TEST
uint64_t _mock_now_timestamp = 0;
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index a12ec788a..35419d8ef 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -148,7 +148,6 @@ const std::chrono::seconds
pegasus_server_impl::kServerStatUpdateTimeSec = std::
const std::string ROCKSDB_ENV_RESTORE_FORCE_RESTORE("restore.force_restore");
const std::string ROCKSDB_ENV_RESTORE_POLICY_NAME("restore.policy_name");
const std::string ROCKSDB_ENV_RESTORE_BACKUP_ID("restore.backup_id");
-const std::string
ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS("rocksdb.checkpoint.reserve_time_seconds");
using cf_opts_setter = std::function<bool(const std::string &,
rocksdb::ColumnFamilyOptions &)>;
const std::unordered_map<std::string, cf_opts_setter> cf_opts_setters = {
@@ -2754,7 +2753,7 @@ void pegasus_server_impl::update_usage_scenario(const
std::map<std::string, std:
// if not specified, default is normal
auto find = envs.find(dsn::replica_envs::ROCKSDB_USAGE_SCENARIO);
std::string new_usage_scenario =
- (find != envs.end() ? find->second :
meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL);
+ (find != envs.end() ? find->second :
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL);
if (new_usage_scenario != _usage_scenario) {
std::string old_usage_scenario = _usage_scenario;
if (set_usage_scenario(new_usage_scenario)) {
@@ -2802,7 +2801,7 @@ void pegasus_server_impl::update_checkpoint_reserve(const
std::map<std::string,
return;
}
}
- find = envs.find(ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS);
+ find =
envs.find(dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS);
if (find != envs.end()) {
if (!dsn::buf2int32(find->second, time) || time < 0) {
LOG_ERROR_PREFIX("{}={} is invalid.", find->first, find->second);
@@ -2819,7 +2818,7 @@ void pegasus_server_impl::update_checkpoint_reserve(const
std::map<std::string,
}
if (time != _checkpoint_reserve_time_seconds) {
LOG_INFO_PREFIX("update app env[{}] from \"{}\" to \"{}\" succeed",
- ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS,
+
dsn::replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS,
_checkpoint_reserve_time_seconds,
time);
_checkpoint_reserve_time_seconds = time;
@@ -3064,9 +3063,9 @@ bool pegasus_server_impl::set_usage_scenario(const
std::string &usage_scenario)
return false;
std::string old_usage_scenario = _usage_scenario;
std::unordered_map<std::string, std::string> new_options;
- if (usage_scenario == meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL ||
- usage_scenario == meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE)
{
- if (_usage_scenario ==
meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
+ if (usage_scenario == dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL
||
+ usage_scenario ==
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE) {
+ if (_usage_scenario ==
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
// old usage scenario is bulk load, reset first
new_options["level0_file_num_compaction_trigger"] =
std::to_string(_data_cf_opts.level0_file_num_compaction_trigger);
@@ -3086,12 +3085,12 @@ bool pegasus_server_impl::set_usage_scenario(const
std::string &usage_scenario)
std::to_string(_data_cf_opts.max_write_buffer_number);
}
- if (usage_scenario == meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL) {
+ if (usage_scenario ==
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL) {
new_options["write_buffer_size"] =
std::to_string(get_random_nearby(_data_cf_opts.write_buffer_size));
new_options["level0_file_num_compaction_trigger"] =
std::to_string(_data_cf_opts.level0_file_num_compaction_trigger);
- } else { // meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE
+ } else { // dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE
uint64_t buffer_size =
dsn::rand::next_u64(_data_cf_opts.write_buffer_size,
_data_cf_opts.write_buffer_size * 2);
new_options["write_buffer_size"] = std::to_string(buffer_size);
@@ -3099,7 +3098,7 @@ bool pegasus_server_impl::set_usage_scenario(const
std::string &usage_scenario)
new_options["level0_file_num_compaction_trigger"] =
std::to_string(std::max<uint64_t>(4UL, max_size /
buffer_size));
}
- } else if (usage_scenario ==
meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
+ } else if (usage_scenario ==
dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
// refer to Options::PrepareForBulkLoad()
new_options["level0_file_num_compaction_trigger"] = "1000000000";
new_options["level0_slowdown_writes_trigger"] = "1000000000";
@@ -3219,9 +3218,9 @@ void pegasus_server_impl::recalculate_data_cf_options(
return;
}
std::unordered_map<std::string, std::string> new_options;
- if (meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario ||
- meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE ==
_usage_scenario) {
- if (meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario) {
+ if (dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL ==
_usage_scenario ||
+ dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE ==
_usage_scenario) {
+ if (dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL ==
_usage_scenario) {
UPDATE_OPTION_IF_NOT_NEARBY(write_buffer_size,
_data_cf_opts.write_buffer_size);
UPDATE_OPTION_IF_NEEDED(level0_file_num_compaction_trigger);
} else {
@@ -3343,11 +3342,12 @@ uint64_t pegasus_server_impl::do_manual_compact(const
rocksdb::CompactRangeOptio
dsn_now_ms() - start_time);
// do compact
- LOG_INFO_PREFIX(
- "start CompactRange, target_level = {}, bottommost_level_compaction =
{}",
- options.target_level,
- options.bottommost_level_compaction ==
rocksdb::BottommostLevelCompaction::kForce ? "force"
-
: "skip");
+ LOG_INFO_PREFIX("start CompactRange, target_level = {},
bottommost_level_compaction = {}",
+ options.target_level,
+ options.bottommost_level_compaction ==
+ rocksdb::BottommostLevelCompaction::kForce
+ ?
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE
+ :
dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP);
start_time = dsn_now_ms();
auto status = _db->CompactRange(options, _data_cf, nullptr, nullptr);
auto end_time = dsn_now_ms();
diff --git a/src/server/test/manual_compact_service_test.cpp
b/src/server/test/manual_compact_service_test.cpp
index e6972e784..8ccd761b5 100644
--- a/src/server/test/manual_compact_service_test.cpp
+++ b/src/server/test/manual_compact_service_test.cpp
@@ -260,7 +260,7 @@ TEST_P(manual_compact_service_test,
extract_manual_compact_opts)
envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX +
dsn::replica_envs::MANUAL_COMPACT_TARGET_LEVEL] = "2";
envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION] =
-
pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
+ dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
extract_manual_compact_opts(envs,
dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX, out);
ASSERT_EQ(out.target_level, 2);
ASSERT_EQ(out.bottommost_level_compaction,
rocksdb::BottommostLevelCompaction::kForce);
@@ -268,7 +268,7 @@ TEST_P(manual_compact_service_test,
extract_manual_compact_opts)
envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX +
dsn::replica_envs::MANUAL_COMPACT_TARGET_LEVEL] = "-1";
envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION] =
-
pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;
+ dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;
extract_manual_compact_opts(envs,
dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX, out);
ASSERT_EQ(out.target_level, -1);
ASSERT_EQ(out.bottommost_level_compaction,
rocksdb::BottommostLevelCompaction::kSkip);
diff --git a/src/server/test/pegasus_server_impl_test.cpp
b/src/server/test/pegasus_server_impl_test.cpp
index 229927d35..1c57922c4 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -28,7 +28,6 @@
#include <string>
#include <utility>
-#include "base/meta_store.h"
#include "common/replica_envs.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
@@ -160,17 +159,18 @@ TEST_P(pegasus_server_impl_test,
test_open_db_with_latest_options)
{
// open a new db with no app env.
ASSERT_EQ(dsn::ERR_OK, start());
- ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
_server->_usage_scenario);
+ ASSERT_EQ(dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL,
_server->_usage_scenario);
// set bulk_load scenario for the db.
-
ASSERT_TRUE(_server->set_usage_scenario(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD));
- ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD,
_server->_usage_scenario);
+ ASSERT_TRUE(
+
_server->set_usage_scenario(dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD));
+ ASSERT_EQ(dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD,
_server->_usage_scenario);
rocksdb::Options opts = _server->_db->GetOptions();
ASSERT_EQ(1000000000, opts.level0_file_num_compaction_trigger);
ASSERT_EQ(true, opts.disable_auto_compactions);
// reopen the db.
ASSERT_EQ(dsn::ERR_OK, _server->stop(false));
ASSERT_EQ(dsn::ERR_OK, start());
- ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD,
_server->_usage_scenario);
+ ASSERT_EQ(dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD,
_server->_usage_scenario);
ASSERT_EQ(opts.level0_file_num_compaction_trigger,
_server->_db->GetOptions().level0_file_num_compaction_trigger);
ASSERT_EQ(opts.disable_auto_compactions,
_server->_db->GetOptions().disable_auto_compactions);
@@ -180,9 +180,9 @@ TEST_P(pegasus_server_impl_test, test_open_db_with_app_envs)
{
std::map<std::string, std::string> envs;
envs[dsn::replica_envs::ROCKSDB_USAGE_SCENARIO] =
- meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
+ dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
ASSERT_EQ(dsn::ERR_OK, start(envs));
- ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD,
_server->_usage_scenario);
+ ASSERT_EQ(dsn::replica_envs::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD,
_server->_usage_scenario);
}
TEST_P(pegasus_server_impl_test, test_open_db_with_rocksdb_envs)
diff --git a/src/utils/test/token_bucket_throttling_controller_test.cpp
b/src/utils/test/token_bucket_throttling_controller_test.cpp
index d3812d484..cecb3b8b9 100644
--- a/src/utils/test/token_bucket_throttling_controller_test.cpp
+++ b/src/utils/test/token_bucket_throttling_controller_test.cpp
@@ -15,85 +15,95 @@
// specific language governing permissions and limitations
// under the License.
-#include "utils/token_bucket_throttling_controller.h"
-
+#include <fmt/core.h>
+#include <stdint.h>
#include <unistd.h>
+#include <limits>
+#include <memory>
+#include <string>
+#include "gmock/gmock.h"
#include "gtest/gtest.h"
+#include "utils/TokenBucket.h"
+#include "utils/test_macros.h"
+#include "utils/throttling_controller.h"
+#include "utils/token_bucket_throttling_controller.h"
namespace dsn {
namespace utils {
-#define INVALIDATE_SITUATION_CHECK(env)
\
- do {
\
- std::string old_value, parse_err;
\
- bool env_changed_result = false;
\
- ASSERT_FALSE(cntl.parse_from_env(env, 4, parse_err,
env_changed_result, old_value)); \
- ASSERT_EQ(env_changed_result, false);
\
- ASSERT_EQ(parse_err, "wrong format, you can set like 20000 or 20K");
\
- ASSERT_EQ(cntl._enabled, true);
\
- ASSERT_EQ(old_value, old_env);
\
- ASSERT_EQ(cntl._env_value, old_env);
\
- } while (0)
-
-#define VALIDATE_SITUATION_CHECK(
\
- env, partition_count, throttle_size, enabled, env_changed, old_env)
\
- do {
\
- bool env_changed_result = false;
\
- std::string old_value, parse_err;
\
- int32_t partitioned_throttle_size = throttle_size / partition_count;
\
- ASSERT_TRUE(
\
- cntl.parse_from_env(env, partition_count, parse_err,
env_changed_result, old_value)); \
- ASSERT_EQ(cntl._env_value, env);
\
- ASSERT_EQ(cntl._partition_count, partition_count);
\
- ASSERT_EQ(cntl._burstsize, partitioned_throttle_size);
\
- ASSERT_EQ(cntl._rate, partitioned_throttle_size);
\
- ASSERT_EQ(cntl._enabled, enabled);
\
- ASSERT_EQ(env_changed_result, env_changed);
\
- ASSERT_EQ(old_value, old_env);
\
- ASSERT_EQ(parse_err, "");
\
- } while (0)
-
class token_bucket_throttling_controller_test : public ::testing::Test
{
public:
- void test_parse_env_basic_token_bucket_throttling()
+ void INVALIDATE_SITUATION_CHECK(const std::string &env)
{
- token_bucket_throttling_controller cntl;
+ std::string old_value, parse_err;
+ bool env_changed_result = false;
+ ASSERT_FALSE(cntl.parse_from_env(env, 4, parse_err,
env_changed_result, old_value));
+ ASSERT_EQ(env_changed_result, false);
+ ASSERT_NE(parse_err, "");
+ ASSERT_EQ(cntl._enabled, true);
+ ASSERT_EQ(old_value, old_env);
+ ASSERT_EQ(cntl._env_value, old_env);
+ }
+ void VALIDATE_SITUATION_CHECK(const std::string &env,
+ int partition_count,
+ uint64_t throttle_size,
+ bool enabled,
+ bool env_changed,
+ const std::string &old_env)
+ {
+ bool env_changed_result = false;
+ std::string old_value, parse_err;
+ int32_t partitioned_throttle_size = throttle_size / partition_count;
+ ASSERT_TRUE(
+ cntl.parse_from_env(env, partition_count, parse_err,
env_changed_result, old_value));
+ ASSERT_EQ(cntl._env_value, env);
+ ASSERT_EQ(cntl._partition_count, partition_count);
+ ASSERT_EQ(cntl._burstsize, partitioned_throttle_size);
+ ASSERT_EQ(cntl._rate, partitioned_throttle_size);
+ ASSERT_EQ(cntl._enabled, enabled);
+ ASSERT_EQ(env_changed_result, env_changed);
+ ASSERT_EQ(old_value, old_env);
+ ASSERT_EQ(parse_err, "");
+ }
+
+ void test_parse_env_basic_token_bucket_throttling()
+ {
// token_bucket_throttling_controller doesn't support delay only
- VALIDATE_SITUATION_CHECK("20000*delay*100", 4, 0, false, true, "");
- VALIDATE_SITUATION_CHECK("200K", 4, 200000, true, true,
"20000*delay*100");
- VALIDATE_SITUATION_CHECK("20000*delay*100,20000*reject*100", 4, 20000,
true, true, "200K");
- VALIDATE_SITUATION_CHECK("20K*delay*100,20K*reject*100",
- 4,
- 20000,
- true,
- true,
- "20000*delay*100,20000*reject*100");
- VALIDATE_SITUATION_CHECK(
- "20000*reject*100", 4, 20000, true, true,
"20K*delay*100,20K*reject*100");
+ NO_FATALS(VALIDATE_SITUATION_CHECK("20000*delay*100", 4, 0, false,
true, ""));
+ NO_FATALS(VALIDATE_SITUATION_CHECK("200K", 4, 200 << 10, true, true,
"20000*delay*100"));
+ NO_FATALS(VALIDATE_SITUATION_CHECK(
+ "20000*delay*100,20000*reject*100", 4, 20000, true, true, "200K"));
+ NO_FATALS(VALIDATE_SITUATION_CHECK("20K*delay*100,20K*reject*100",
+ 4,
+ 20 << 10,
+ true,
+ true,
+
"20000*delay*100,20000*reject*100"));
+ NO_FATALS(VALIDATE_SITUATION_CHECK(
+ "20000*reject*100", 4, 20000, true, true,
"20K*delay*100,20K*reject*100"));
// invalid argument]
- std::string old_env = "20000*reject*100";
- INVALIDATE_SITUATION_CHECK("0");
- INVALIDATE_SITUATION_CHECK("*deldday*100");
- INVALIDATE_SITUATION_CHECK("");
- INVALIDATE_SITUATION_CHECK("*reject");
- INVALIDATE_SITUATION_CHECK("*reject*");
- INVALIDATE_SITUATION_CHECK("reject*");
- INVALIDATE_SITUATION_CHECK("reject");
- INVALIDATE_SITUATION_CHECK("200g");
- INVALIDATE_SITUATION_CHECK("200G");
- INVALIDATE_SITUATION_CHECK("M");
- INVALIDATE_SITUATION_CHECK("K");
- INVALIDATE_SITUATION_CHECK("-1K");
- INVALIDATE_SITUATION_CHECK("1aK");
- INVALIDATE_SITUATION_CHECK("pegNo1");
- INVALIDATE_SITUATION_CHECK("-20");
- INVALIDATE_SITUATION_CHECK("12KM");
- INVALIDATE_SITUATION_CHECK("1K2M");
- INVALIDATE_SITUATION_CHECK("2000K0*reject*100");
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("0"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("*deldday*100"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK(""));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("*reject"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("*reject*"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("reject*"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("reject"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("200g"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("200G"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("M"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("K"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("-1K"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("1aK"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("pegNo1"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("-20"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("12KM"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("1K2M"));
+ NO_FATALS(INVALIDATE_SITUATION_CHECK("2000K0*reject*100"));
}
void throttle_test()
@@ -155,13 +165,58 @@ public:
ASSERT_GT(fail_count1, 10000);
ASSERT_LE(fail_count, fail_count1 * 1.2);
}
+
+private:
+ token_bucket_throttling_controller cntl;
+
+ static const std::string old_env;
};
+const std::string token_bucket_throttling_controller_test::old_env =
"20000*reject*100";
+
TEST_F(token_bucket_throttling_controller_test,
test_parse_env_basic_token_bucket_throttling)
{
test_parse_env_basic_token_bucket_throttling();
}
TEST_F(token_bucket_throttling_controller_test, throttle_test) {
throttle_test(); }
+
+TEST_F(token_bucket_throttling_controller_test, parse_unit_test)
+{
+ std::string max_minus_1 =
std::to_string(std::numeric_limits<uint64_t>::max() - 1);
+ struct parse_unit_test_case
+ {
+ std::string input;
+ bool expected_result;
+ uint64_t expected_units;
+ std::string expected_hint_message_piece;
+ } tests[] = {
+ {"", false, 0, "integer"},
+ {"-", false, 0, "integer"},
+ {"A", false, 0, "integer"},
+ {"M", false, 0, "integer"},
+ {"K", false, 0, "integer"},
+ {"aM", false, 0, "integer"},
+ {"aK", false, 0, "integer"},
+ {fmt::format("{}M", max_minus_1), false, 0, "overflow"},
+ {fmt::format("{}K", max_minus_1), false, 0, "overflow"},
+ {"10M", true, 10 << 20, ""},
+ {"1K", true, 1 << 10, ""},
+ {"100", true, 100, ""},
+ };
+
+ for (const auto &test : tests) {
+ uint64_t actual_units = 0;
+ std::string actual_hint_message;
+ ASSERT_EQ(test.expected_result,
+ throttling_controller::parse_unit(test.input, actual_units,
actual_hint_message));
+ ASSERT_EQ(test.expected_units, actual_units);
+ if (test.expected_result) {
+ ASSERT_EQ(actual_hint_message, "");
+ } else {
+ ASSERT_STR_CONTAINS(actual_hint_message,
test.expected_hint_message_piece);
+ }
+ }
+};
} // namespace utils
} // namespace dsn
diff --git a/src/utils/throttling_controller.cpp
b/src/utils/throttling_controller.cpp
index 2350adb5a..3079b5748 100644
--- a/src/utils/throttling_controller.cpp
+++ b/src/utils/throttling_controller.cpp
@@ -17,6 +17,7 @@
#include "throttling_controller.h"
+#include <fmt/core.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <algorithm>
#include <memory>
@@ -27,7 +28,7 @@
#include "utils/strings.h"
namespace dsn {
-namespace replication {
+namespace utils {
throttling_controller::throttling_controller()
: _enabled(false),
@@ -41,87 +42,177 @@ throttling_controller::throttling_controller()
{
}
+bool throttling_controller::parse_unit(std::string arg,
+ /*out*/ uint64_t &units,
+ /*out*/ std::string &hint_message)
+{
+ hint_message.clear();
+ // Extract multiplier.
+ uint64_t unit_multiplier = 1;
+ if (!arg.empty()) {
+ switch (*arg.rbegin()) {
+ case 'M':
+ unit_multiplier = 1 << 20;
+ break;
+ case 'K':
+ unit_multiplier = 1 << 10;
+ break;
+ default:
+ // Maybe a number, it's valid.
+ break;
+ }
+ // Remove the tail 'M' or 'K'.
+ if (unit_multiplier != 1) {
+ arg.pop_back();
+ }
+ }
+
+ // Parse value.
+ uint64_t tmp;
+ if (!buf2uint64(arg, tmp)) {
+ hint_message = fmt::format("'{}' should be an unsigned integer", arg);
+ return false;
+ }
+
+ // Check overflow.
+ uint64_t result;
+ if (__builtin_mul_overflow(tmp, unit_multiplier, &result)) {
+ hint_message = fmt::format("'{}' result is overflow", arg);
+ return false;
+ }
+ units = tmp * unit_multiplier;
+
+ return true;
+}
+
+throttling_controller::ParseResult
+throttling_controller::parse_from_env(const std::string &arg,
+ const std::string &type,
+ /*out*/ uint64_t &units,
+ /*out*/ uint64_t &delay_ms,
+ /*out*/ std::string &hint_message)
+{
+ hint_message.clear();
+ std::vector<std::string> sub_args;
+ utils::split_args(arg.c_str(), sub_args, '*', true);
+ if (sub_args.size() != 3) {
+ hint_message = fmt::format("The field count of '{}' separated by '*'
must be 3", arg);
+ return ParseResult::kFail;
+ }
+
+ // 1. Check the first part, which must be a positive number, optionally
followed with 'K' or
+ // 'M'.
+ uint64_t u = 0;
+ if (!parse_unit(sub_args[0], u, hint_message)) {
+ return ParseResult::kFail;
+ }
+
+ // 2. Check the second part, which must be "delay" or "reject"
+ if (sub_args[1] != type) {
+ hint_message = fmt::format("'{}' should be 'delay' or 'reject'",
sub_args[1]);
+ return ParseResult::kIgnore;
+ }
+
+ // 3. Check the third part, which must be an unsigned integer.
+ uint64_t ms = 0;
+ if (!buf2uint64(sub_args[2], ms)) {
+ hint_message = fmt::format("'{}' should be an unsigned integer",
sub_args[2]);
+ return ParseResult::kFail;
+ }
+
+ units = u;
+ delay_ms = ms;
+ return ParseResult::kSuccess;
+}
+
bool throttling_controller::parse_from_env(const std::string &env_value,
- int partition_count,
- std::string &parse_error,
- bool &changed,
- std::string &old_env_value)
+ /*out*/ uint64_t &delay_units,
+ /*out*/ uint64_t &delay_ms,
+ /*out*/ uint64_t &reject_units,
+ /*out*/ uint64_t &reject_delay_ms,
+ /*out*/ std::string &hint_message)
{
- changed = false;
- if (_enabled && env_value == _env_value && partition_count ==
_partition_count)
- return true;
+ hint_message.clear();
std::vector<std::string> sargs;
- utils::split_args(env_value.c_str(), sargs, ',', true);
+ utils::split_args(env_value.c_str(), sargs, ',');
if (sargs.empty()) {
- parse_error = "empty env value";
+ hint_message = "The value shouldn't be empty";
return false;
}
+
+ // Example for sarg: 100K*delay*100, 100M*reject*100, etc.
bool delay_parsed = false;
- int64_t delay_units = 0;
- int64_t delay_ms = 0;
bool reject_parsed = false;
- int64_t reject_units = 0;
- int64_t reject_delay_ms = 0;
- for (std::string &s : sargs) {
- std::vector<std::string> sargs1;
- utils::split_args(s.c_str(), sargs1, '*', true);
- if (sargs1.size() != 3) {
- parse_error = "invalid field count, should be 3";
+ for (const auto &sarg : sargs) {
+ uint64_t units = 0;
+ uint64_t ms = 0;
+ // Check the "delay" args.
+ auto result = parse_from_env(sarg, "delay", units, ms, hint_message);
+ if (result == ParseResult::kFail) {
return false;
}
- int64_t unit_multiplier = 1;
- if (!sargs1[0].empty()) {
- if (*sargs1[0].rbegin() == 'M') {
- unit_multiplier = 1000 * 1000;
- } else if (*sargs1[0].rbegin() == 'K') {
- unit_multiplier = 1000;
- }
- if (unit_multiplier != 1) {
- sargs1[0].pop_back();
- }
- }
- int64_t units = 0;
- if (!buf2int64(sargs1[0], units) || units < 0) {
- parse_error = "invalid units, should be non-negative int";
- return false;
- }
- units *= unit_multiplier;
-
- int64_t ms = 0;
- if (!buf2int64(sargs1[2], ms) || ms < 0) {
- parse_error = "invalid delay ms, should be non-negative int";
- return false;
- }
- if (sargs1[1] == "delay") {
+ if (result == ParseResult::kSuccess) {
if (delay_parsed) {
- parse_error = "duplicate delay config";
+ hint_message = "duplicate 'delay' config";
return false;
}
delay_parsed = true;
- delay_units = units / partition_count + 1;
+ delay_units = units;
delay_ms = ms;
- } else if (sargs1[1] == "reject") {
+ continue;
+ }
+
+ // Check the "reject" args.
+ result = parse_from_env(sarg, "reject", units, ms, hint_message);
+ if (result == ParseResult::kSuccess) {
if (reject_parsed) {
- parse_error = "duplicate reject config";
+ hint_message = "duplicate 'reject' config";
return false;
}
reject_parsed = true;
- reject_units = units / partition_count + 1;
+ reject_units = units;
reject_delay_ms = ms;
- } else {
- parse_error = "invalid throttling type";
- return false;
+ continue;
}
+
+ if (hint_message.empty()) {
+ hint_message = fmt::format("only 'delay' or 'reject' is
supported", sarg);
+ }
+ return false;
}
+ return true;
+}
+
+bool throttling_controller::parse_from_env(const std::string &env_value,
+ int partition_count,
+ std::string &hint_message,
+ bool &changed,
+ std::string &old_env_value)
+{
+ hint_message.clear();
+ changed = false;
+ if (_enabled && env_value == _env_value && partition_count ==
_partition_count) {
+ return true;
+ }
+
+ uint64_t delay_units = 0;
+ uint64_t delay_ms = 0;
+ uint64_t reject_units = 0;
+ uint64_t reject_delay_ms = 0;
+ if (!parse_from_env(
+ env_value, delay_units, delay_ms, reject_units, reject_delay_ms,
hint_message)) {
+ return false;
+ }
+
changed = true;
old_env_value = _env_value;
_enabled = true;
_env_value = env_value;
_partition_count = partition_count;
- _delay_units = delay_units;
+ _delay_units = delay_units == 0 ? 0 : (delay_units / partition_count + 1);
_delay_ms = delay_ms;
- _reject_units = reject_units;
+ _reject_units = reject_units == 0 ? 0 : (reject_units / partition_count +
1);
_reject_delay_ms = reject_delay_ms;
return true;
}
@@ -179,5 +270,5 @@ throttling_controller::throttling_type
throttling_controller::control(
return PASS;
}
-} // namespace replication
+} // namespace utils
} // namespace dsn
diff --git a/src/utils/throttling_controller.h
b/src/utils/throttling_controller.h
index 40bc85432..f33c06dcf 100644
--- a/src/utils/throttling_controller.h
+++ b/src/utils/throttling_controller.h
@@ -21,8 +21,7 @@
#include <string>
namespace dsn {
-
-namespace replication {
+namespace utils {
// Used for replica throttling.
// Different throttling strategies may use different 'request_units', which is
@@ -51,14 +50,35 @@ public:
// return true if parse succeed.
// return false if parse failed for the reason of invalid env_value.
// if return false, the original value will not be changed.
- // 'parse_error' is set when return false.
+ // 'hint_message' is set when return false.
// 'changed' is set when return true.
// 'old_env_value' is set when 'changed' is set to true.
+ enum class ParseResult
+ {
+ kSuccess = 0,
+ kFail,
+ kIgnore
+ };
bool parse_from_env(const std::string &env_value,
int partition_count,
- /*out*/ std::string &parse_error,
+ /*out*/ std::string &hint_message,
/*out*/ bool &changed,
/*out*/ std::string &old_env_value);
+ static ParseResult parse_from_env(const std::string &arg,
+ const std::string &type,
+ /*out*/ uint64_t &units,
+ /*out*/ uint64_t &delay_ms,
+ /*out*/ std::string &hint_message);
+ static bool parse_from_env(const std::string &env_value,
+ /*out*/ uint64_t &delay_units,
+ /*out*/ uint64_t &delay_ms,
+ /*out*/ uint64_t &reject_units,
+ /*out*/ uint64_t &reject_delay_ms,
+ /*out*/ std::string &hint_message);
+
+ // Return true if it's in format of '<unsigned integer>[K|M]'
+ static bool
+ parse_unit(std::string arg, /*out*/ uint64_t &units, /*out*/ std::string
&hint_message);
// reset to no throttling.
void reset(/*out*/ bool &changed, /*out*/ std::string &old_env_value);
@@ -85,5 +105,5 @@ private:
int64_t _cur_units;
};
-} // namespace replication
+} // namespace utils
} // namespace dsn
diff --git a/src/utils/token_bucket_throttling_controller.cpp
b/src/utils/token_bucket_throttling_controller.cpp
index e1d5df5d7..323c11cff 100644
--- a/src/utils/token_bucket_throttling_controller.cpp
+++ b/src/utils/token_bucket_throttling_controller.cpp
@@ -23,6 +23,7 @@
#include "string_conv.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
+#include "utils/throttling_controller.h"
namespace dsn {
namespace utils {
@@ -85,7 +86,7 @@ bool token_bucket_throttling_controller::parse_from_env(const
std::string &env_v
return true;
}
- int64_t reject_size_value;
+ uint64_t reject_size_value;
bool enabled;
if (!transform_env_string(env_value, reject_size_value, enabled,
parse_error)) {
return false;
@@ -101,74 +102,67 @@ bool
token_bucket_throttling_controller::parse_from_env(const std::string &env_v
return true;
}
-bool token_bucket_throttling_controller::string_to_value(std::string str,
int64_t &value)
-{
- int64_t unit_multiplier = 1;
- if (*str.rbegin() == 'M') {
- unit_multiplier = 1000 * 1000;
- } else if (*str.rbegin() == 'K') {
- unit_multiplier = 1000;
- }
- if (unit_multiplier != 1) {
- str.pop_back();
- }
- if (!buf2int64(str, value) || value < 0) {
- return false;
- }
- value *= unit_multiplier;
- return true;
-}
-
bool token_bucket_throttling_controller::validate(const std::string &env,
std::string &hint_message)
{
- int64_t temp;
+ uint64_t temp;
bool temp_bool;
- bool validated = transform_env_string(env, temp, temp_bool, hint_message);
- return validated;
+ return transform_env_string(env, temp, temp_bool, hint_message);
};
bool token_bucket_throttling_controller::transform_env_string(const
std::string &env,
- int64_t
&reject_size_value,
+ uint64_t
&reject_size_value,
bool &enabled,
std::string
&hint_message)
{
+ hint_message.clear();
enabled = true;
- if (buf2int64(env, reject_size_value) && reject_size_value > 0) {
+ // format like "200"
+ if (buf2uint64(env, reject_size_value) && reject_size_value > 0) {
return true;
}
// format like "200K"
- if (string_to_value(env, reject_size_value) && reject_size_value > 0) {
+ if (throttling_controller::parse_unit(env, reject_size_value,
hint_message) &&
+ reject_size_value > 0) {
return true;
}
- // format like "20000*delay*100"
- if (env.find("delay") != -1 && env.find("reject") == -1) {
- // rate must > 0 in TokenBucket.h
- reject_size_value = 1;
- enabled = false;
-
- LOG_DEBUG("token_bucket_throttling_controller doesn't support delay
method, so throttling "
- "controller is disabled now");
- return true;
+ // format like "20000*delay*100", it's not supported.
+ {
+ uint64_t units = 0;
+ uint64_t ms = 0;
+ if (throttling_controller::parse_from_env(env, "delay", units, ms,
hint_message) ==
+ throttling_controller::ParseResult::kSuccess) {
+ // rate must > 0 in TokenBucket.h
+ reject_size_value = 1;
+ enabled = false;
+
+ LOG_DEBUG(
+ "token_bucket_throttling_controller doesn't support delay
method, so throttling "
+ "controller is disabled now");
+ return true;
+ }
}
// format like "20000*delay*100,20000*reject*100"
- auto comma_index = env.find(",");
- auto star_index = env.find("*reject", comma_index + 1);
- if (star_index < 0) {
- hint_message = "wrong format, you can set like 20000 or 20K";
- return false;
- }
- auto reject_size = env.substr(comma_index + 1, star_index - comma_index -
1);
-
- if (string_to_value(reject_size, reject_size_value) && reject_size_value >
0) {
- return true;
+ uint64_t reject_units = 0;
+ {
+ uint64_t delay_units = 0;
+ uint64_t delay_ms = 0;
+ uint64_t reject_delay_ms = 0;
+ if (!throttling_controller::parse_from_env(
+ env, delay_units, delay_ms, reject_units, reject_delay_ms,
hint_message)) {
+ return false;
+ }
+
+ if (reject_units == 0) {
+ hint_message = "reject value should be greater than 0";
+ return false;
+ }
}
-
- hint_message = "wrong format, you can set like 20000 or 20K";
- return false;
+ reject_size_value = reject_units;
+ return true;
}
} // namespace utils
diff --git a/src/utils/token_bucket_throttling_controller.h
b/src/utils/token_bucket_throttling_controller.h
index f765e2ab6..a83435ddb 100644
--- a/src/utils/token_bucket_throttling_controller.h
+++ b/src/utils/token_bucket_throttling_controller.h
@@ -85,13 +85,11 @@ public:
bool &changed,
std::string &old_env_value);
- static bool string_to_value(std::string str, int64_t &value);
-
// wrapper of transform_env_string, check if the env string is validated.
static bool validate(const std::string &env, std::string &hint_message);
static bool transform_env_string(const std::string &env,
- int64_t &reject_size_value,
+ uint64_t &reject_size_value,
bool &enabled,
std::string &hint_message);
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]