This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 4c76112a5 refactor(conf): use DSN_DEFINE_uint32 to load uint32 type of
configs (#1352)
4c76112a5 is described below
commit 4c76112a5ded750fd7bf1601ac28b7891cebfe07
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Feb 15 14:16:52 2023 +0800
refactor(conf): use DSN_DEFINE_uint32 to load uint32 type of configs (#1352)
---
src/geo/lib/geo_client.cpp | 15 ++++----
src/meta/test/main.cpp | 12 +++---
src/runtime/rpc/asio_net_provider.cpp | 4 --
src/runtime/rpc/network.cpp | 12 ++++--
src/runtime/rpc/network.h | 1 -
src/runtime/rpc/network.sim.cpp | 25 +++++-------
src/runtime/rpc/network.sim.h | 2 -
src/runtime/rpc/rpc_engine.cpp | 4 +-
src/runtime/rpc/rpc_message.cpp | 23 +++++++----
src/runtime/rpc/rpc_message.h | 3 --
src/runtime/service_engine.cpp | 16 +-------
src/runtime/test/netprovider.cpp | 39 +++++++++----------
src/server/available_detector.cpp | 38 +++++++++----------
src/server/available_detector.h | 2 -
src/server/info_collector.cpp | 42 +++++++++-----------
src/server/info_collector.h | 3 --
src/server/pegasus_server_impl.cpp | 7 +++-
src/server/pegasus_server_impl.h | 2 -
src/server/pegasus_server_impl_init.cpp | 57 ++++++++++++++--------------
src/test/kill_test/data_verifier.cpp | 44 +++++++++++----------
src/test/kill_test/kill_testor.cpp | 17 ++++-----
src/test/kill_test/kill_testor.h | 4 --
src/test/kill_test/partition_kill_testor.cpp | 8 +++-
src/test/kill_test/process_kill_testor.cpp | 36 +++++++++++-------
24 files changed, 197 insertions(+), 219 deletions(-)
diff --git a/src/geo/lib/geo_client.cpp b/src/geo/lib/geo_client.cpp
index 83daf855c..bb039d52d 100644
--- a/src/geo/lib/geo_client.cpp
+++ b/src/geo/lib/geo_client.cpp
@@ -68,6 +68,8 @@ DSN_DEFINE_group_validator(min_max_level, [](std::string
&message) -> bool {
}
return true;
});
+DSN_DEFINE_uint32(geo_client.lib, latitude_index, 5, "latitude index in
value");
+DSN_DEFINE_uint32(geo_client.lib, longitude_index, 4, "longitude index in
value");
struct SearchResultNearer
{
@@ -99,14 +101,11 @@ geo_client::geo_client(const char *config_file,
_geo_data_client = pegasus_client_factory::get_client(cluster_name,
geo_app_name);
CHECK_NOTNULL(_geo_data_client, "init pegasus _geo_data_client failed");
- uint32_t latitude_index = (uint32_t)dsn_config_get_value_uint64(
- "geo_client.lib", "latitude_index", 5, "latitude index in value");
-
- uint32_t longitude_index = (uint32_t)dsn_config_get_value_uint64(
- "geo_client.lib", "longitude_index", 4, "longitude index in value");
-
- dsn::error_s s = _codec.set_latlng_indices(latitude_index,
longitude_index);
- CHECK(s.is_ok(), "set_latlng_indices({}, {}) failed", latitude_index,
longitude_index);
+ dsn::error_s s = _codec.set_latlng_indices(FLAGS_latitude_index,
FLAGS_longitude_index);
+ CHECK(s.is_ok(),
+ "set_latlng_indices({}, {}) failed",
+ FLAGS_latitude_index,
+ FLAGS_longitude_index);
}
dsn::error_s geo_client::set_max_level(int level)
diff --git a/src/meta/test/main.cpp b/src/meta/test/main.cpp
index 4691351ba..bfc09b220 100644
--- a/src/meta/test/main.cpp
+++ b/src/meta/test/main.cpp
@@ -47,6 +47,8 @@ DEFINE_TASK_CODE(TASK_META_TEST, TASK_PRIORITY_COMMON,
THREAD_POOL_META_TEST)
meta_service_test_app *g_app;
+DSN_DEFINE_uint32(tools.simulator, random_seed, 0, "random seed");
+
// as it is not easy to clean test environment in some cases, we simply run
these tests in several
// commands,
// please check the script "run.sh" to modify the GTEST_FILTER
@@ -80,13 +82,11 @@ TEST(meta, app_envs_basic_test) {
g_app->app_envs_basic_test(); }
dsn::error_code meta_service_test_app::start(const std::vector<std::string>
&args)
{
- uint32_t seed =
- (uint32_t)dsn_config_get_value_uint64("tools.simulator",
"random_seed", 0, "random seed");
- if (seed == 0) {
- seed = time(0);
- LOG_ERROR("initial seed: {}", seed);
+ if (FLAGS_random_seed == 0) {
+ FLAGS_random_seed = static_cast<uint32_t>(time(nullptr));
+ LOG_INFO("initial seed: {}", FLAGS_random_seed);
}
- srand(seed);
+ srand(FLAGS_random_seed);
int argc = args.size();
char *argv[20];
diff --git a/src/runtime/rpc/asio_net_provider.cpp
b/src/runtime/rpc/asio_net_provider.cpp
index a43c44c34..f0d7790c3 100644
--- a/src/runtime/rpc/asio_net_provider.cpp
+++ b/src/runtime/rpc/asio_net_provider.cpp
@@ -72,10 +72,6 @@ error_code asio_network_provider::start(rpc_channel channel,
int port, bool clie
if (_acceptor != nullptr)
return ERR_SERVICE_ALREADY_RUNNING;
- // get connection threshold from config, default value 0 means no threshold
- _cfg_conn_threshold_per_ip = (uint32_t)dsn_config_get_value_uint64(
- "network", "conn_threshold_per_ip", 0, "max connection count to each
server per ip");
-
for (int i = 0; i < FLAGS_io_service_worker_count; i++) {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
task::set_tls_dsn_context(node(), nullptr);
diff --git a/src/runtime/rpc/network.cpp b/src/runtime/rpc/network.cpp
index 08c584af9..83ee02266 100644
--- a/src/runtime/rpc/network.cpp
+++ b/src/runtime/rpc/network.cpp
@@ -35,6 +35,11 @@
#include "utils/strings.h"
namespace dsn {
+DSN_DEFINE_uint32(network,
+ conn_threshold_per_ip,
+ 0,
+ "max connection count to each server per ip, 0 means no
limit");
+
/*static*/ join_point<void, rpc_session *>
rpc_session::on_rpc_session_connected("rpc.session.connected");
/*static*/ join_point<void, rpc_session *>
@@ -582,7 +587,6 @@ uint32_t network::get_local_ipv4()
connection_oriented_network::connection_oriented_network(rpc_engine *srv,
network *inner_provider)
: network(srv, inner_provider)
{
- _cfg_conn_threshold_per_ip = 0;
_client_session_count.init_global_counter("server",
"network",
"client_session_count",
@@ -744,7 +748,7 @@ void
connection_oriented_network::on_server_session_disconnected(rpc_session_ptr
bool
connection_oriented_network::check_if_conn_threshold_exceeded(::dsn::rpc_address
ep)
{
- if (_cfg_conn_threshold_per_ip <= 0) {
+ if (FLAGS_conn_threshold_per_ip <= 0) {
LOG_DEBUG("new client from {} is connecting to server {}, no
connection threshold",
ep.ipv4_str(),
address());
@@ -760,7 +764,7 @@ bool
connection_oriented_network::check_if_conn_threshold_exceeded(::dsn::rpc_ad
ip_conn_count = it->second;
}
}
- if (ip_conn_count >= _cfg_conn_threshold_per_ip) {
+ if (ip_conn_count >= FLAGS_conn_threshold_per_ip) {
exceeded = true;
}
@@ -769,7 +773,7 @@ bool
connection_oriented_network::check_if_conn_threshold_exceeded(::dsn::rpc_ad
ep.ipv4_str(),
address(),
ip_conn_count,
- _cfg_conn_threshold_per_ip);
+ FLAGS_conn_threshold_per_ip);
return exceeded;
}
diff --git a/src/runtime/rpc/network.h b/src/runtime/rpc/network.h
index 63c04135c..ab6746323 100644
--- a/src/runtime/rpc/network.h
+++ b/src/runtime/rpc/network.h
@@ -185,7 +185,6 @@ protected:
ip_connection_count _ip_conn_count; // from_ip => connection count
utils::rw_lock_nr _servers_lock;
- uint32_t _cfg_conn_threshold_per_ip;
perf_counter_wrapper _client_session_count;
};
diff --git a/src/runtime/rpc/network.sim.cpp b/src/runtime/rpc/network.sim.cpp
index 7ff0a678b..3f3453b92 100644
--- a/src/runtime/rpc/network.sim.cpp
+++ b/src/runtime/rpc/network.sim.cpp
@@ -42,10 +42,17 @@
#include "utils/rand.h"
#include "runtime/node_scoper.h"
#include "network.sim.h"
+#include "utils/flags.h"
namespace dsn {
namespace tools {
+DSN_DEFINE_uint32(tools.simulator, min_message_delay_microseconds, 1, "min
message delay (us)");
+DSN_DEFINE_uint32(tools.simulator,
+ max_message_delay_microseconds,
+ 100000,
+ "max message delay (us)");
+
// switch[channel][header_format]
// multiple machines connect to the same switch
// 10 should be >= than rpc_channel::max_value() + 1
@@ -153,20 +160,6 @@ sim_network_provider::sim_network_provider(rpc_engine
*rpc, network *inner_provi
: connection_oriented_network(rpc, inner_provider)
{
_address.assign_ipv4("localhost", 1);
-
- _min_message_delay_microseconds = 1;
- _max_message_delay_microseconds = 100000;
-
- _min_message_delay_microseconds =
- (uint32_t)dsn_config_get_value_uint64("tools.simulator",
- "min_message_delay_microseconds",
- _min_message_delay_microseconds,
- "min message delay (us)");
- _max_message_delay_microseconds =
- (uint32_t)dsn_config_get_value_uint64("tools.simulator",
- "max_message_delay_microseconds",
- _max_message_delay_microseconds,
- "max message delay (us)");
}
error_code sim_network_provider::start(rpc_channel channel, int port, bool
client_only)
@@ -194,8 +187,8 @@ error_code sim_network_provider::start(rpc_channel channel,
int port, bool clien
uint32_t sim_network_provider::net_delay_milliseconds() const
{
- return static_cast<uint32_t>(
- rand::next_u32(_min_message_delay_microseconds,
_max_message_delay_microseconds)) /
+ return rand::next_u32(FLAGS_min_message_delay_microseconds,
+ FLAGS_max_message_delay_microseconds) /
1000;
}
}
diff --git a/src/runtime/rpc/network.sim.h b/src/runtime/rpc/network.sim.h
index 42a915bc4..1f7437bda 100644
--- a/src/runtime/rpc/network.sim.h
+++ b/src/runtime/rpc/network.sim.h
@@ -99,8 +99,6 @@ public:
private:
::dsn::rpc_address _address;
- uint32_t _min_message_delay_microseconds;
- uint32_t _max_message_delay_microseconds;
};
//------------- inline implementations -------------
diff --git a/src/runtime/rpc/rpc_engine.cpp b/src/runtime/rpc/rpc_engine.cpp
index 8644d7839..6cf18186f 100644
--- a/src/runtime/rpc/rpc_engine.cpp
+++ b/src/runtime/rpc/rpc_engine.cpp
@@ -39,8 +39,10 @@
#include "runtime/rpc/serialization.h"
#include "utils/rand.h"
#include <set>
+#include "utils/flags.h"
namespace dsn {
+DSN_DECLARE_uint32(local_hash);
DEFINE_TASK_CODE(LPC_RPC_TIMEOUT, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)
@@ -735,7 +737,7 @@ void rpc_engine::reply(message_ex *response, error_code err)
sizeof(response->header->server.error_name) - 1);
response->header->server.error_name[sizeof(response->header->server.error_name)
- 1] = '\0';
response->header->server.error_code.local_code = err;
- response->header->server.error_code.local_hash = message_ex::s_local_hash;
+ response->header->server.error_code.local_hash = FLAGS_local_hash;
// response rpc code may be TASK_CODE_INVALID when request rpc code is not
exist
auto sp = response->local_rpc_code == TASK_CODE_INVALID
diff --git a/src/runtime/rpc/rpc_message.cpp b/src/runtime/rpc/rpc_message.cpp
index 6b06621e9..e6113a25b 100644
--- a/src/runtime/rpc/rpc_message.cpp
+++ b/src/runtime/rpc/rpc_message.cpp
@@ -32,13 +32,20 @@
#include <cctype>
#include "runtime/task/task_engine.h"
+#include "utils/flags.h"
using namespace dsn::utils;
namespace dsn {
+// init common for all per-node providers
+DSN_DEFINE_uint32(core,
+ local_hash,
+ 0,
+ "a same hash value from two processes indicate the rpc codes
are registered in "
+ "the same order, and therefore the mapping between rpc code
string and integer "
+ "is the same, which we leverage for fast rpc handler lookup
optimization");
std::atomic<uint64_t> message_ex::_id(0);
-uint32_t message_ex::s_local_hash = 0;
message_ex::message_ex()
: header(nullptr),
@@ -82,11 +89,11 @@ error_code message_ex::error()
{
dsn::error_code code;
auto binary_hash = header->server.error_code.local_hash;
- if (binary_hash != 0 && binary_hash == ::dsn::message_ex::s_local_hash) {
+ if (binary_hash != 0 && binary_hash == FLAGS_local_hash) {
code = dsn::error_code(header->server.error_code.local_code);
} else {
code = error_code::try_get(header->server.error_name,
dsn::ERR_UNKNOWN);
- header->server.error_code.local_hash = ::dsn::message_ex::s_local_hash;
+ header->server.error_code.local_hash = FLAGS_local_hash;
header->server.error_code.local_code = code;
}
return code;
@@ -99,11 +106,11 @@ task_code message_ex::rpc_code()
}
auto binary_hash = header->rpc_code.local_hash;
- if (binary_hash != 0 && binary_hash == ::dsn::message_ex::s_local_hash) {
+ if (binary_hash != 0 && binary_hash == FLAGS_local_hash) {
local_rpc_code = dsn::task_code(header->rpc_code.local_code);
} else {
local_rpc_code = dsn::task_code::try_get(header->rpc_name,
::dsn::TASK_CODE_INVALID);
- header->rpc_code.local_hash = ::dsn::message_ex::s_local_hash;
+ header->rpc_code.local_hash = FLAGS_local_hash;
header->rpc_code.local_code = local_rpc_code.code();
}
@@ -307,7 +314,7 @@ message_ex *message_ex::create_request(dsn::task_code
rpc_code,
strncpy(hdr.rpc_name, sp->name.c_str(), sizeof(hdr.rpc_name) - 1);
hdr.rpc_name[sizeof(hdr.rpc_name) - 1] = '\0';
hdr.rpc_code.local_code = (uint32_t)rpc_code;
- hdr.rpc_code.local_hash = s_local_hash;
+ hdr.rpc_code.local_hash = FLAGS_local_hash;
hdr.id = new_id();
@@ -348,7 +355,7 @@ message_ex *message_ex::create_response()
strncpy(hdr.rpc_name, response_sp->name.c_str(), sizeof(hdr.rpc_name)
- 1);
hdr.rpc_name[sizeof(hdr.rpc_name) - 1] = '\0';
hdr.rpc_code.local_code = msg->local_rpc_code;
- hdr.rpc_code.local_hash = s_local_hash;
+ hdr.rpc_code.local_hash = FLAGS_local_hash;
// join point
request_sp->on_rpc_create_response.execute(this, msg);
@@ -359,7 +366,7 @@ message_ex *message_ex::create_response()
strncpy(hdr.rpc_name, ack_rpc_name.c_str(), sizeof(hdr.rpc_name) - 1);
hdr.rpc_name[sizeof(hdr.rpc_name) - 1] = '\0';
hdr.rpc_code.local_code = TASK_CODE_INVALID;
- hdr.rpc_code.local_hash = s_local_hash;
+ hdr.rpc_code.local_hash = FLAGS_local_hash;
}
return msg;
diff --git a/src/runtime/rpc/rpc_message.h b/src/runtime/rpc/rpc_message.h
index 25bde2a80..4c65c0dc6 100644
--- a/src/runtime/rpc/rpc_message.h
+++ b/src/runtime/rpc/rpc_message.h
@@ -233,9 +233,6 @@ private:
int _rw_offset; // current buffer offset
bool _rw_committed; // mark if it is in middle state of reading/writing
bool _is_read; // is for read(recv) or write(send)
-
-public:
- static uint32_t s_local_hash; // used by fast_rpc_name
};
typedef dsn::ref_ptr<message_ex> message_ptr;
diff --git a/src/runtime/service_engine.cpp b/src/runtime/service_engine.cpp
index 45e187e7e..f46aea738 100644
--- a/src/runtime/service_engine.cpp
+++ b/src/runtime/service_engine.cpp
@@ -194,21 +194,7 @@ service_engine::service_engine()
service_engine::~service_engine() { _nodes_by_app_id.clear(); }
-void service_engine::init_before_toollets(const service_spec &spec)
-{
- _spec = spec;
-
- // init common for all per-node providers
- message_ex::s_local_hash =
- (uint32_t)dsn_config_get_value_uint64("core",
- "local_hash",
- 0,
- "a same hash value from two
processes indicate the "
- "rpc code are registered in the
same order, "
- "and therefore the mapping
between rpc code string "
- "and integer is the same, which
we leverage "
- "for fast rpc handler lookup
optimization");
-}
+void service_engine::init_before_toollets(const service_spec &spec) { _spec =
spec; }
void service_engine::init_after_toollets()
{
diff --git a/src/runtime/test/netprovider.cpp b/src/runtime/test/netprovider.cpp
index f43e8bf3b..5d9f93444 100644
--- a/src/runtime/test/netprovider.cpp
+++ b/src/runtime/test/netprovider.cpp
@@ -60,25 +60,18 @@
#include "runtime/rpc/rpc_engine.h"
#include "runtime/service_engine.h"
#include "test_utils.h"
+#include "utils/flags.h"
-using namespace dsn;
-using namespace dsn::tools;
+namespace dsn {
+DSN_DECLARE_uint32(conn_threshold_per_ip);
-class asio_network_provider_test : public asio_network_provider
+class asio_network_provider_test : public tools::asio_network_provider
{
public:
asio_network_provider_test(rpc_engine *srv, network *inner_provider)
- : asio_network_provider(srv, inner_provider)
+ : tools::asio_network_provider(srv, inner_provider)
{
}
-
-public:
- void change_test_cfg_conn_threshold_per_ip(uint32_t n)
- {
- LOG_INFO(
- "change _cfg_conn_threshold_per_ip {} -> {} for test",
_cfg_conn_threshold_per_ip, n);
- _cfg_conn_threshold_per_ip = n;
- }
};
static int TEST_PORT = 20401;
@@ -158,8 +151,8 @@ TEST(tools_common, asio_net_provider)
ASSERT_TRUE(dsn_rpc_register_handler(
RPC_TEST_NETPROVIDER, "rpc.test.netprovider", rpc_server_response));
- std::unique_ptr<asio_network_provider> asio_network(
- new asio_network_provider(task::get_current_rpc(), nullptr));
+ std::unique_ptr<tools::asio_network_provider> asio_network(
+ new tools::asio_network_provider(task::get_current_rpc(), nullptr));
error_code start_result;
start_result = asio_network->start(RPC_CHANNEL_TCP, TEST_PORT, true);
@@ -172,8 +165,8 @@ TEST(tools_common, asio_net_provider)
rpc_address network_addr = asio_network->address();
ASSERT_TRUE(network_addr.port() == TEST_PORT);
- std::unique_ptr<asio_network_provider> asio_network2(
- new asio_network_provider(task::get_current_rpc(), nullptr));
+ std::unique_ptr<tools::asio_network_provider> asio_network2(
+ new tools::asio_network_provider(task::get_current_rpc(), nullptr));
start_result = asio_network2->start(RPC_CHANNEL_TCP, TEST_PORT, true);
ASSERT_TRUE(start_result == ERR_OK);
@@ -205,8 +198,8 @@ TEST(tools_common, asio_udp_provider)
ASSERT_TRUE(dsn_rpc_register_handler(
RPC_TEST_NETPROVIDER, "rpc.test.netprovider", rpc_server_response));
- std::unique_ptr<asio_udp_provider> client(
- new asio_udp_provider(task::get_current_rpc(), nullptr));
+ std::unique_ptr<tools::asio_udp_provider> client(
+ new tools::asio_udp_provider(task::get_current_rpc(), nullptr));
error_code start_result;
start_result = client->start(RPC_CHANNEL_UDP, 0, true);
@@ -248,8 +241,8 @@ TEST(tools_common, sim_net_provider)
ASSERT_TRUE(dsn_rpc_register_handler(
RPC_TEST_NETPROVIDER, "rpc.test.netprovider", rpc_server_response));
- std::unique_ptr<sim_network_provider> sim_net(
- new sim_network_provider(task::get_current_rpc(), nullptr));
+ std::unique_ptr<tools::sim_network_provider> sim_net(
+ new tools::sim_network_provider(task::get_current_rpc(), nullptr));
error_code ans;
ans = sim_net->start(RPC_CHANNEL_TCP, TEST_PORT, false);
@@ -286,7 +279,10 @@ TEST(tools_common,
asio_network_provider_connection_threshold)
ASSERT_TRUE(start_result == ERR_OK);
auto CONN_THRESHOLD = 3;
- asio_network->change_test_cfg_conn_threshold_per_ip(CONN_THRESHOLD);
+ LOG_INFO("change FLAGS_conn_threshold_per_ip {} -> {} for test",
+ FLAGS_conn_threshold_per_ip,
+ CONN_THRESHOLD);
+ FLAGS_conn_threshold_per_ip = CONN_THRESHOLD;
// not exceed threshold
for (int count = 0; count < CONN_THRESHOLD + 2; count++) {
@@ -318,3 +314,4 @@ TEST(tools_common,
asio_network_provider_connection_threshold)
TEST_PORT++;
}
+} // namespace dsn
diff --git a/src/server/available_detector.cpp
b/src/server/available_detector.cpp
index 02906e7b7..6f2b83412 100644
--- a/src/server/available_detector.cpp
+++ b/src/server/available_detector.cpp
@@ -38,7 +38,14 @@ DSN_DEFINE_int32(pegasus.collector,
available_detect_alert_fail_count,
30,
"available detect alert fail count");
-
+DSN_DEFINE_uint32(pegasus.collector,
+ available_detect_interval_seconds,
+ 3,
+ "detect interval seconds");
+DSN_DEFINE_uint32(pegasus.collector,
+ available_detect_timeout,
+ 1000,
+ "available detect timeout in millisecond");
available_detector::available_detector()
: _client(nullptr),
_ddl_client(nullptr),
@@ -69,16 +76,6 @@ available_detector::available_detector()
_meta_list.clear();
dsn::replication::replica_helper::load_meta_servers(_meta_list);
CHECK(!_meta_list.empty(), "");
- _detect_interval_seconds =
- (uint32_t)dsn_config_get_value_uint64("pegasus.collector",
-
"available_detect_interval_seconds",
- 3, // default value 3s
- "detect interval seconds");
- _detect_timeout =
- (uint32_t)dsn_config_get_value_uint64("pegasus.collector",
- "available_detect_timeout",
- 1000, // unit is
millisecond,default is 1s = 1000ms
- "available detect timeout");
// initialize the _client.
if (!pegasus_client_factory::initialize(nullptr)) {
CHECK(false, "Initialize the pegasus client failed");
@@ -175,11 +172,11 @@ void available_detector::detect_available()
_fail_count[i].reset(new std::atomic<int32_t>(0));
_fail_count[i]->store(0);
auto call_func = std::bind(&available_detector::on_detect, this, i);
- _detect_tasks[i] =
- ::dsn::tasking::enqueue_timer(LPC_DETECT_AVAILABLE,
- &_tracker,
- std::move(call_func),
-
std::chrono::seconds(_detect_interval_seconds));
+ _detect_tasks[i] = ::dsn::tasking::enqueue_timer(
+ LPC_DETECT_AVAILABLE,
+ &_tracker,
+ std::move(call_func),
+ std::chrono::seconds(FLAGS_available_detect_interval_seconds));
}
}
@@ -341,12 +338,15 @@ void available_detector::on_detect(int32_t idx)
check_and_send_email(&cnt, idx);
} else {
LOG_DEBUG("async_set partition[{}] ok, hash_key = {}", idx,
_hash_keys[idx]);
- _client->async_get(
- _hash_keys[idx], "", std::move(user_async_get_callback),
_detect_timeout);
+ _client->async_get(_hash_keys[idx],
+ "",
+ std::move(user_async_get_callback),
+ FLAGS_available_detect_timeout);
}
};
- _client->async_set(_hash_keys[idx], "", value,
std::move(async_set_callback), _detect_timeout);
+ _client->async_set(
+ _hash_keys[idx], "", value, std::move(async_set_callback),
FLAGS_available_detect_timeout);
}
void available_detector::check_and_send_email(std::atomic<int> *cnt, int32_t
idx)
diff --git a/src/server/available_detector.h b/src/server/available_detector.h
index 9af26f757..ea7621c5a 100644
--- a/src/server/available_detector.h
+++ b/src/server/available_detector.h
@@ -61,7 +61,6 @@ private:
pegasus_client *_client;
std::shared_ptr<replication_ddl_client> _ddl_client;
std::vector<dsn::rpc_address> _meta_list;
- uint32_t _detect_interval_seconds;
::dsn::utils::ex_lock_nr _alert_lock;
// for record partition fail times.
std::vector<std::shared_ptr<std::atomic<int32_t>>> _fail_count;
@@ -72,7 +71,6 @@ private:
int32_t _app_id;
int32_t _partition_count;
std::vector<::dsn::partition_configuration> partitions;
- uint32_t _detect_timeout;
std::string _send_alert_email_cmd;
std::string _send_availability_info_email_cmd;
diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp
index ec2307085..c802cb723 100644
--- a/src/server/info_collector.cpp
+++ b/src/server/info_collector.cpp
@@ -45,6 +45,16 @@ DEFINE_TASK_CODE(LPC_PEGASUS_STORAGE_SIZE_STAT_TIMER,
TASK_PRIORITY_COMMON,
::dsn::THREAD_POOL_DEFAULT)
+DSN_DEFINE_uint32(pegasus.collector, app_stat_interval_seconds, 10, "app stat
interval seconds");
+DSN_DEFINE_uint32(pegasus.collector,
+ capacity_unit_fetch_interval_seconds,
+ 8,
+ "capacity unit fetch interval seconds");
+DSN_DEFINE_uint32(pegasus.collector,
+ storage_size_fetch_interval_seconds,
+ 3600,
+ "storage size fetch interval seconds");
+
info_collector::info_collector()
{
std::vector<::dsn::rpc_address> meta_servers;
@@ -62,11 +72,6 @@ info_collector::info_collector()
_shell_context->meta_list = meta_servers;
_shell_context->ddl_client.reset(new replication_ddl_client(meta_servers));
- _app_stat_interval_seconds =
(uint32_t)dsn_config_get_value_uint64("pegasus.collector",
-
"app_stat_interval_seconds",
- 10, //
default value 10s
- "app
stat interval seconds");
-
_usage_stat_app = dsn_config_get_value_string(
"pegasus.collector", "usage_stat_app", "", "app for recording usage
statistics");
CHECK(!_usage_stat_app.empty(), "");
@@ -76,29 +81,18 @@ info_collector::info_collector()
CHECK_NOTNULL(_client, "Initialize the client failed");
_result_writer = dsn::make_unique<result_writer>(_client);
- _capacity_unit_fetch_interval_seconds =
- (uint32_t)dsn_config_get_value_uint64("pegasus.collector",
-
"capacity_unit_fetch_interval_seconds",
- 8, // default value 8s
- "capacity unit fetch interval
seconds");
// _capacity_unit_retry_wait_seconds is in range of [1, 10]
_capacity_unit_retry_wait_seconds =
- std::min(10u, std::max(1u, _capacity_unit_fetch_interval_seconds /
10));
+ std::min(10u, std::max(1u, FLAGS_capacity_unit_fetch_interval_seconds
/ 10));
// _capacity_unit_retry_max_count is in range of [0, 3]
- _capacity_unit_retry_max_count =
- std::min(3u, _capacity_unit_fetch_interval_seconds /
_capacity_unit_retry_wait_seconds);
-
- _storage_size_fetch_interval_seconds =
- (uint32_t)dsn_config_get_value_uint64("pegasus.collector",
-
"storage_size_fetch_interval_seconds",
- 3600, // default value 1h
- "storage size fetch interval
seconds");
+ _capacity_unit_retry_max_count = std::min(
+ 3u, FLAGS_capacity_unit_fetch_interval_seconds /
_capacity_unit_retry_wait_seconds);
// _storage_size_retry_wait_seconds is in range of [1, 60]
_storage_size_retry_wait_seconds =
- std::min(60u, std::max(1u, _storage_size_fetch_interval_seconds / 10));
+ std::min(60u, std::max(1u, FLAGS_storage_size_fetch_interval_seconds /
10));
// _storage_size_retry_max_count is in range of [0, 3]
_storage_size_retry_max_count =
- std::min(3u, _storage_size_fetch_interval_seconds /
_storage_size_retry_wait_seconds);
+ std::min(3u, FLAGS_storage_size_fetch_interval_seconds /
_storage_size_retry_wait_seconds);
}
info_collector::~info_collector()
@@ -115,7 +109,7 @@ void info_collector::start()
::dsn::tasking::enqueue_timer(LPC_PEGASUS_APP_STAT_TIMER,
&_tracker,
[this] { on_app_stat(); },
-
std::chrono::seconds(_app_stat_interval_seconds),
+
std::chrono::seconds(FLAGS_app_stat_interval_seconds),
0,
std::chrono::minutes(1));
@@ -123,7 +117,7 @@ void info_collector::start()
LPC_PEGASUS_CAPACITY_UNIT_STAT_TIMER,
&_tracker,
[this] { on_capacity_unit_stat(_capacity_unit_retry_max_count); },
- std::chrono::seconds(_capacity_unit_fetch_interval_seconds),
+ std::chrono::seconds(FLAGS_capacity_unit_fetch_interval_seconds),
0,
std::chrono::minutes(1));
@@ -131,7 +125,7 @@ void info_collector::start()
LPC_PEGASUS_STORAGE_SIZE_STAT_TIMER,
&_tracker,
[this] { on_storage_size_stat(_storage_size_retry_max_count); },
- std::chrono::seconds(_storage_size_fetch_interval_seconds),
+ std::chrono::seconds(FLAGS_storage_size_fetch_interval_seconds),
0,
std::chrono::minutes(1));
}
diff --git a/src/server/info_collector.h b/src/server/info_collector.h
index d0187eea2..2d9240abc 100644
--- a/src/server/info_collector.h
+++ b/src/server/info_collector.h
@@ -225,7 +225,6 @@ private:
::dsn::rpc_address _meta_servers;
std::string _cluster_name;
std::shared_ptr<shell_context> _shell_context;
- uint32_t _app_stat_interval_seconds;
::dsn::task_ptr _app_stat_timer_task;
::dsn::utils::ex_lock_nr _app_stat_counter_lock;
std::map<std::string, app_stat_counters *> _app_stat_counters;
@@ -236,11 +235,9 @@ private:
pegasus_client *_client;
// for writing cu stat result
std::unique_ptr<result_writer> _result_writer;
- uint32_t _capacity_unit_fetch_interval_seconds;
uint32_t _capacity_unit_retry_wait_seconds;
uint32_t _capacity_unit_retry_max_count;
::dsn::task_ptr _capacity_unit_stat_timer_task;
- uint32_t _storage_size_fetch_interval_seconds;
uint32_t _storage_size_retry_wait_seconds;
uint32_t _storage_size_retry_max_count;
::dsn::task_ptr _storage_size_stat_timer_task;
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index 3b9688e35..4629304b9 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -51,6 +51,8 @@ namespace server {
DEFINE_TASK_CODE(LPC_PEGASUS_SERVER_DELAY, TASK_PRIORITY_COMMON,
::dsn::THREAD_POOL_DEFAULT)
DSN_DECLARE_int32(read_amp_bytes_per_bit);
+DSN_DECLARE_uint32(checkpoint_reserve_min_count);
+DSN_DECLARE_uint32(checkpoint_reserve_time_seconds);
DSN_DEFINE_int32(pegasus.server,
hotkey_analyse_time_interval_s,
@@ -2622,10 +2624,11 @@ void pegasus_server_impl::update_default_ttl(const
std::map<std::string, std::st
}
}
+// TODO(yingchun): change by http
void pegasus_server_impl::update_checkpoint_reserve(const
std::map<std::string, std::string> &envs)
{
- int32_t count = _checkpoint_reserve_min_count_in_config;
- int32_t time = _checkpoint_reserve_time_seconds_in_config;
+ int32_t count = FLAGS_checkpoint_reserve_min_count;
+ int32_t time = FLAGS_checkpoint_reserve_time_seconds;
auto find = envs.find(ROCKDB_CHECKPOINT_RESERVE_MIN_COUNT);
if (find != envs.end()) {
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index c907f05b4..84687353c 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -456,8 +456,6 @@ private:
std::unique_ptr<capacity_unit_calculator> _cu_calculator;
std::unique_ptr<pegasus_server_write> _server_write;
- uint32_t _checkpoint_reserve_min_count_in_config;
- uint32_t _checkpoint_reserve_time_seconds_in_config;
uint32_t _checkpoint_reserve_min_count;
uint32_t _checkpoint_reserve_time_seconds;
std::atomic_bool _is_checkpointing; // whether the db is doing
checkpoint
diff --git a/src/server/pegasus_server_impl_init.cpp
b/src/server/pegasus_server_impl_init.cpp
index 012ca6ac1..571f7e09b 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -176,6 +176,25 @@ DSN_DEFINE_uint64(pegasus.server,
"specify the maximal numbers of info log files to be kept:
once the number of "
"info logs goes beyond this option, stale log files will be
cleaned.");
+DSN_DEFINE_uint32(pegasus.server,
+ rocksdb_multi_get_max_iteration_count,
+ 3000,
+ "max iteration count for each range read for multi-get
operation, if exceed this "
+ "threshold, iterator will be stopped.");
+DSN_DEFINE_uint32(pegasus.server,
+ rocksdb_max_iteration_count,
+ 1000,
+ "max iteration count for each range read, if exceed this
threshold, iterator "
+ "will be stopped.");
+DSN_DEFINE_uint32(pegasus.server,
+ checkpoint_reserve_min_count,
+ 2,
+ "Minimum count of checkpoint to reserve.");
+DSN_DEFINE_uint32(pegasus.server,
+ checkpoint_reserve_time_seconds,
+ 1800,
+ "Minimum seconds of checkpoint to reserve, 0 means no
check.");
+
static const std::unordered_map<std::string,
rocksdb::BlockBasedTableOptions::IndexType>
INDEX_TYPE_STRING_MAP = {
{"binary_search",
rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch},
@@ -236,13 +255,7 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
1000,
"multi-get operation iterate count exceed this threshold will be
logged, 0 means no check");
- _rng_rd_opts.multi_get_max_iteration_count =
(uint32_t)dsn_config_get_value_uint64(
- "pegasus.server",
- "rocksdb_multi_get_max_iteration_count",
- 3000,
- "max iteration count for each range read for multi-get operation, if "
- "exceed this threshold,"
- "iterator will be stopped");
+ _rng_rd_opts.multi_get_max_iteration_count =
FLAGS_rocksdb_multi_get_max_iteration_count;
_rng_rd_opts.multi_get_max_iteration_size =
dsn_config_get_value_uint64("pegasus.server",
@@ -251,13 +264,7 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
"multi-get operation total key-value size
exceed "
"this threshold will stop iterating
rocksdb, 0 means no check");
- _rng_rd_opts.rocksdb_max_iteration_count =
- (uint32_t)dsn_config_get_value_uint64("pegasus.server",
- "rocksdb_max_iteration_count",
- 1000,
- "max iteration count for each
range "
- "read, if exceed this threshold,
"
- "iterator will be stopped");
+ _rng_rd_opts.rocksdb_max_iteration_count =
FLAGS_rocksdb_max_iteration_count;
_rng_rd_opts.rocksdb_iteration_threshold_time_ms_in_config =
dsn_config_get_value_uint64(
"pegasus.server",
@@ -283,12 +290,13 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
false,
"rocksdb
options.use_direct_io_for_flush_and_compaction");
+ // TODO(yingchun): size_t, uint64_t
_db_opts.compaction_readahead_size =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_compaction_readahead_size",
2 * 1024 * 1024,
"rocksdb
options.compaction_readahead_size");
-
+ // TODO(yingchun): size_t, uint64_t
_db_opts.writable_file_max_buffer_size =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_writable_file_max_buffer_size",
@@ -305,6 +313,7 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_db_opts.max_background_compactions =
FLAGS_rocksdb_max_background_compactions;
// init rocksdb::ColumnFamilyOptions for data column family
+ // TODO(yingchun): size_t, uint64_t
_data_cf_opts.write_buffer_size =
(size_t)dsn_config_get_value_uint64("pegasus.server",
"rocksdb_write_buffer_size",
@@ -313,7 +322,7 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_data_cf_opts.max_write_buffer_number =
FLAGS_rocksdb_max_write_buffer_number;
_data_cf_opts.num_levels = FLAGS_rocksdb_num_levels;
-
+ // TODO(yingchun): size_t, uint64_t
_data_cf_opts.target_file_size_base =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_target_file_size_base",
@@ -321,7 +330,7 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
"rocksdb options.target_file_size_base");
_data_cf_opts.target_file_size_multiplier =
FLAGS_rocksdb_target_file_size_multiplier;
-
+ // TODO(yingchun): size_t, uint64_t
_data_cf_opts.max_bytes_for_level_base =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_max_bytes_for_level_base",
@@ -581,18 +590,10 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
"rocksdb_periodic_compaction_seconds",
0,
"periodic_compaction_seconds, 0 means no
periodic compaction");
+ _checkpoint_reserve_min_count = FLAGS_checkpoint_reserve_min_count;
+ _checkpoint_reserve_time_seconds = FLAGS_checkpoint_reserve_time_seconds;
- // get the checkpoint reserve options.
- _checkpoint_reserve_min_count_in_config =
(uint32_t)dsn_config_get_value_uint64(
- "pegasus.server", "checkpoint_reserve_min_count", 2,
"checkpoint_reserve_min_count");
- _checkpoint_reserve_min_count = _checkpoint_reserve_min_count_in_config;
- _checkpoint_reserve_time_seconds_in_config =
- (uint32_t)dsn_config_get_value_uint64("pegasus.server",
-
"checkpoint_reserve_time_seconds",
- 1800,
-
"checkpoint_reserve_time_seconds, 0 means no check");
- _checkpoint_reserve_time_seconds =
_checkpoint_reserve_time_seconds_in_config;
-
+ // TODO(yingchun): signed integral type of at least 35 bits, int64_t
_update_rdb_stat_interval =
std::chrono::seconds(dsn_config_get_value_uint64(
"pegasus.server", "update_rdb_stat_interval", 60,
"update_rdb_stat_interval, in seconds"));
diff --git a/src/test/kill_test/data_verifier.cpp
b/src/test/kill_test/data_verifier.cpp
index 6880de2bb..6fcc22470 100644
--- a/src/test/kill_test/data_verifier.cpp
+++ b/src/test/kill_test/data_verifier.cpp
@@ -46,6 +46,7 @@
#include "pegasus/client.h"
#include "data_verifier.h"
#include "utils/fmt_logging.h"
+#include "utils/flags.h"
using namespace std;
using namespace ::pegasus;
@@ -53,11 +54,8 @@ using namespace ::pegasus;
static pegasus_client *client = nullptr;
static string app_name;
static string pegasus_cluster_name;
-static uint32_t set_and_get_timeout_milliseconds;
-static int set_thread_count = 0;
static std::atomic_llong set_next(0);
-static int get_thread_count = 0;
static std::vector<long long> set_thread_setting_id;
static const char *set_next_key = "set_next";
@@ -73,6 +71,16 @@ static const long stat_p999_pos = stat_batch - stat_batch /
1000 - 1;
static const long stat_p9999_pos = stat_batch - stat_batch / 10000 - 1;
static const long stat_max_pos = stat_batch - 1;
+DSN_DEFINE_uint32(pegasus.killtest,
+ set_and_get_timeout_milliseconds,
+ 3000,
+ "set() and get() timeout in milliseconds.");
+DSN_DEFINE_uint32(pegasus.killtest, set_thread_count, 5, "Thread count of the
setter.");
+DSN_DEFINE_uint32(pegasus.killtest,
+ get_thread_count,
+ FLAGS_set_thread_count * 4,
+ "Thread count of the getter.");
+
// return time in us.
long get_time()
{
@@ -84,7 +92,7 @@ long get_time()
long long get_min_thread_setting_id()
{
long long id = set_thread_setting_id[0];
- for (int i = 1; i < set_thread_count; ++i) {
+ for (int i = 1; i < FLAGS_set_thread_count; ++i) {
if (set_thread_setting_id[i] < id)
id = set_thread_setting_id[i];
}
@@ -115,8 +123,8 @@ void do_set(int thread_id)
value.assign(buf);
}
pegasus_client::internal_info info;
- int ret =
- client->set(hash_key, sort_key, value,
set_and_get_timeout_milliseconds, 0, &info);
+ int ret = client->set(
+ hash_key, sort_key, value, FLAGS_set_and_get_timeout_milliseconds,
0, &info);
if (ret == PERR_OK) {
long cur_time = get_time();
LOG_INFO("SetThread[{}]: set succeed: id={}, try={}, time={}
(gpid={}.{}, decree={}, "
@@ -197,8 +205,8 @@ void do_get_range(int thread_id, int round_id, long long
start_id, long long end
}
pegasus_client::internal_info info;
std::string get_value;
- int ret =
- client->get(hash_key, sort_key, get_value,
set_and_get_timeout_milliseconds, &info);
+ int ret = client->get(
+ hash_key, sort_key, get_value,
FLAGS_set_and_get_timeout_milliseconds, &info);
if (ret == PERR_OK || ret == PERR_NOT_FOUND) {
long cur_time = get_time();
if (ret == PERR_NOT_FOUND) {
@@ -317,7 +325,7 @@ void do_check(int thread_count)
while (true) {
char buf[1024];
sprintf(buf, "%lld", range_end);
- int ret = client->set(check_max_key, "", buf,
set_and_get_timeout_milliseconds);
+ int ret = client->set(check_max_key, "", buf,
FLAGS_set_and_get_timeout_milliseconds);
if (ret == PERR_OK) {
LOG_INFO("CheckThread: round({}): update \"{}\" succeed:
check_max={}",
round_id,
@@ -354,7 +362,7 @@ void do_mark()
}
sprintf(buf, "%lld", new_id);
value.assign(buf);
- int ret = client->set(set_next_key, "", value,
set_and_get_timeout_milliseconds);
+ int ret = client->set(set_next_key, "", value,
FLAGS_set_and_get_timeout_milliseconds);
if (ret == PERR_OK) {
long cur_time = get_time();
LOG_INFO("MarkThread: update \"{}\" succeed: set_next={}, time={}",
@@ -392,13 +400,6 @@ void verifier_initialize(const char *config_file)
LOG_ERROR("Initialize the _client failed");
exit(-1);
}
-
- set_and_get_timeout_milliseconds = (uint32_t)dsn_config_get_value_uint64(
- section, "set_and_get_timeout_milliseconds", 3000, "set and get
timeout milliseconds");
- set_thread_count =
- (uint32_t)dsn_config_get_value_uint64(section, "set_thread_count", 5,
"set thread count");
- get_thread_count = (uint32_t)dsn_config_get_value_uint64(
- section, "get_thread_count", set_thread_count * 4, "get thread count");
}
void verifier_start()
@@ -406,7 +407,8 @@ void verifier_start()
// check the set_next
while (true) {
std::string set_next_value;
- int ret = client->get(set_next_key, "", set_next_value,
set_and_get_timeout_milliseconds);
+ int ret =
+ client->get(set_next_key, "", set_next_value,
FLAGS_set_and_get_timeout_milliseconds);
if (ret == PERR_OK) {
long long i = atoll(set_next_value.c_str());
if (i == 0 && !set_next_value.empty()) {
@@ -427,17 +429,17 @@ void verifier_start()
client->get_error_string(ret));
}
}
- set_thread_setting_id.resize(set_thread_count);
+ set_thread_setting_id.resize(FLAGS_set_thread_count);
std::vector<std::thread> set_threads;
- for (int i = 0; i < set_thread_count; ++i) {
+ for (int i = 0; i < FLAGS_set_thread_count; ++i) {
set_threads.emplace_back(do_set, i);
}
std::thread mark_thread(do_mark);
// start several threads to read data from pegasus cluster and check data
correctness,
// block until the check failed
- do_check(get_thread_count);
+ do_check(FLAGS_get_thread_count);
mark_thread.join();
for (auto &t : set_threads) {
diff --git a/src/test/kill_test/kill_testor.cpp
b/src/test/kill_test/kill_testor.cpp
index 9ab1cab49..ac920e5dd 100644
--- a/src/test/kill_test/kill_testor.cpp
+++ b/src/test/kill_test/kill_testor.cpp
@@ -40,9 +40,12 @@
#include "kill_testor.h"
#include "killer_handler.h"
#include "killer_handler_shell.h"
+#include "utils/flags.h"
namespace pegasus {
namespace test {
+DSN_DEFINE_uint32(pegasus.killtest, kill_interval_seconds, 30, "");
+DSN_DEFINE_uint32(pegasus.killtest, max_seconds_for_all_partitions_to_recover,
600, "");
kill_testor::kill_testor(const char *config_file)
{
@@ -75,11 +78,6 @@ kill_testor::kill_testor(const char *config_file)
LOG_ERROR("Initialize the _ddl_client failed");
exit(-1);
}
-
- kill_interval_seconds =
- (uint32_t)dsn_config_get_value_uint64(section,
"kill_interval_seconds", 30, "");
- max_seconds_for_partitions_recover = (uint32_t)dsn_config_get_value_uint64(
- section, "max_seconds_for_all_partitions_to_recover", 600, "");
srand((unsigned)time(nullptr));
}
@@ -166,10 +164,11 @@ bool kill_testor::check_cluster_status()
int healthy_partition_cnt = 0;
int unhealthy_partition_cnt = 0;
int try_count = 1;
- while (try_count <= max_seconds_for_partitions_recover) {
- dsn::error_code err = get_partition_info(try_count ==
max_seconds_for_partitions_recover,
- healthy_partition_cnt,
- unhealthy_partition_cnt);
+ while (try_count <= FLAGS_max_seconds_for_all_partitions_to_recover) {
+ dsn::error_code err =
+ get_partition_info(try_count ==
FLAGS_max_seconds_for_all_partitions_to_recover,
+ healthy_partition_cnt,
+ unhealthy_partition_cnt);
if (err == dsn::ERR_OK) {
if (unhealthy_partition_cnt > 0) {
LOG_DEBUG("query partition status success, but still have
unhealthy partition, "
diff --git a/src/test/kill_test/kill_testor.h b/src/test/kill_test/kill_testor.h
index 533a45b41..0ce6cf364 100644
--- a/src/test/kill_test/kill_testor.h
+++ b/src/test/kill_test/kill_testor.h
@@ -63,10 +63,6 @@ protected:
vector<dsn::rpc_address> meta_list;
std::vector<partition_configuration> partitions;
-
- int kill_interval_seconds;
- uint32_t _sleep_time_before_recover_seconds;
- uint32_t max_seconds_for_partitions_recover;
};
} // namespace test
} // namespace pegasus
diff --git a/src/test/kill_test/partition_kill_testor.cpp
b/src/test/kill_test/partition_kill_testor.cpp
index 8d6e06fd9..2f86f0483 100644
--- a/src/test/kill_test/partition_kill_testor.cpp
+++ b/src/test/kill_test/partition_kill_testor.cpp
@@ -31,9 +31,13 @@
#include "remote_cmd/remote_command.h"
#include "partition_kill_testor.h"
+#include "utils/flags.h"
namespace pegasus {
namespace test {
+
+DSN_DECLARE_uint32(kill_interval_seconds);
+
partition_kill_testor::partition_kill_testor(const char *config_file) :
kill_testor(config_file) {}
void partition_kill_testor::Run()
@@ -45,8 +49,8 @@ void partition_kill_testor::Run()
} else {
run();
}
- LOG_INFO("sleep {} seconds before checking", kill_interval_seconds);
- sleep(kill_interval_seconds);
+ LOG_INFO("sleep {} seconds before checking",
FLAGS_kill_interval_seconds);
+ sleep(FLAGS_kill_interval_seconds);
}
}
diff --git a/src/test/kill_test/process_kill_testor.cpp
b/src/test/kill_test/process_kill_testor.cpp
index 7c6b9c6eb..8dd25a76b 100644
--- a/src/test/kill_test/process_kill_testor.cpp
+++ b/src/test/kill_test/process_kill_testor.cpp
@@ -43,22 +43,25 @@
namespace pegasus {
namespace test {
-DSN_DEFINE_int32(section, total_meta_count, 0, "total meta count");
-DSN_DEFINE_int32(section, total_replica_count, 0, "total replica count");
-DSN_DEFINE_int32(section, total_zookeeper_count, 0, "total zookeeper count");
-DSN_DEFINE_int32(section,
+DSN_DEFINE_int32(pegasus.killtest, total_meta_count, 0, "total meta count");
+DSN_DEFINE_int32(pegasus.killtest, total_replica_count, 0, "total replica
count");
+DSN_DEFINE_int32(pegasus.killtest, total_zookeeper_count, 0, "total zookeeper
count");
+DSN_DEFINE_int32(pegasus.killtest,
kill_replica_max_count,
FLAGS_total_replica_count,
"replica killed max count");
-DSN_DEFINE_int32(section, kill_meta_max_count, FLAGS_total_meta_count, "meta
killed max count");
-DSN_DEFINE_int32(section,
+DSN_DEFINE_int32(pegasus.killtest,
+ kill_meta_max_count,
+ FLAGS_total_meta_count,
+ "meta killed max count");
+DSN_DEFINE_int32(pegasus.killtest,
kill_zookeeper_max_count,
FLAGS_total_zookeeper_count,
"zookeeper killed max count");
DSN_DEFINE_group_validator(kill_test_role_count, [](std::string &message) ->
bool {
if (FLAGS_total_meta_count == 0 && FLAGS_total_replica_count == 0 &&
FLAGS_total_zookeeper_count == 0) {
- message = fmt::format("[section].total_meta_count, total_replica_count
and "
+ message = fmt::format("[pegasus.killtest].total_meta_count,
total_replica_count and "
"total_zookeeper_count should not all be 0.");
return false;
}
@@ -66,24 +69,28 @@ DSN_DEFINE_group_validator(kill_test_role_count,
[](std::string &message) -> boo
return true;
});
+DSN_DEFINE_uint32(pegasus.killtest,
+ sleep_time_before_recover_seconds,
+ 30,
+ "sleep time before recover seconds");
+
+DSN_DECLARE_uint32(kill_interval_seconds);
+
process_kill_testor::process_kill_testor(const char *config_file) :
kill_testor(config_file)
{
register_kill_handlers();
- const char *section = "pegasus.killtest";
kill_round = 0;
// initialize killer_handler
std::string killer_name =
- dsn_config_get_value_string(section, "killer_handler", "", "killer
handler");
+ dsn_config_get_value_string("pegasus.killtest", "killer_handler", "",
"killer handler");
CHECK(!killer_name.empty(), "");
_killer_handler.reset(killer_handler::new_handler(killer_name.c_str()));
CHECK(_killer_handler, "invalid killer_name({})", killer_name);
_job_types = {META, REPLICA, ZOOKEEPER};
_job_index_to_kill.resize(JOB_LENGTH);
- _sleep_time_before_recover_seconds = (uint32_t)dsn_config_get_value_uint64(
- section, "sleep_time_before_recover_seconds", 30, "sleep time before
recover seconds");
}
process_kill_testor::~process_kill_testor() {}
@@ -112,8 +119,8 @@ void process_kill_testor::Run()
stop_verifier_and_exit("the verifier process is dead");
}
run();
- LOG_INFO("sleep {} seconds before checking", kill_interval_seconds);
- sleep(kill_interval_seconds);
+ LOG_INFO("sleep {} seconds before checking",
FLAGS_kill_interval_seconds);
+ sleep(FLAGS_kill_interval_seconds);
}
}
@@ -148,7 +155,8 @@ void process_kill_testor::run()
stop_verifier_and_exit("kill jobs failed");
}
- auto sleep_time_random_seconds = generate_one_number(1,
_sleep_time_before_recover_seconds);
+ auto sleep_time_random_seconds =
+ generate_one_number(1, FLAGS_sleep_time_before_recover_seconds);
LOG_INFO("sleep {} seconds before recovery", sleep_time_random_seconds);
sleep(sleep_time_random_seconds);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]