This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new d83ce3d12 refactor(conf): use DSN_DEFINE_int32 to load int32 type of
configs (#1324)
d83ce3d12 is described below
commit d83ce3d127c9ed6461fba9b0317406cac5dc7786
Author: Yingchun Lai <[email protected]>
AuthorDate: Sun Jan 29 12:08:05 2023 +0800
refactor(conf): use DSN_DEFINE_int32 to load int32 type of configs (#1324)
---
src/block_service/test/fds_service_test.cpp | 4 +-
src/common/replication_common.cpp | 241 +++++---------------------
src/common/replication_common.h | 38 +---
src/meta/meta_backup_service.cpp | 10 +-
src/meta/meta_service.cpp | 22 ++-
src/meta/test/backup_test.cpp | 6 +-
src/replica/backup/replica_backup_manager.cpp | 19 +-
src/replica/mutation_cache.h | 2 +-
src/replica/replica.cpp | 28 ++-
src/replica/replica_2pc.cpp | 52 ++++--
src/replica/replica_check.cpp | 9 +-
src/replica/replica_chkpt.cpp | 29 +++-
src/replica/replica_init.cpp | 30 ++--
src/replica/replica_learn.cpp | 21 ++-
src/replica/replica_stub.cpp | 116 ++++++++-----
src/replica/split/replica_split_manager.cpp | 4 +-
src/replica/test/mock_utils.h | 4 +-
src/server/pegasus_server_impl_init.cpp | 138 +++++++--------
src/shell/commands/data_operations.cpp | 15 +-
src/test/bench_test/benchmark.cpp | 34 ++--
src/test/bench_test/config.cpp | 26 ++-
src/test/bench_test/config.h | 10 --
22 files changed, 395 insertions(+), 463 deletions(-)
diff --git a/src/block_service/test/fds_service_test.cpp
b/src/block_service/test/fds_service_test.cpp
index ea47cc019..6bd1fa081 100644
--- a/src/block_service/test/fds_service_test.cpp
+++ b/src/block_service/test/fds_service_test.cpp
@@ -26,6 +26,7 @@
#include "block_service/block_service.h"
#include "utils/filesystem.h"
+#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/rand.h"
#include "utils/safe_strerror_posix.h"
@@ -672,7 +673,8 @@ TEST_F(FDSClientTest, test_concurrent_upload_download)
_service->initialize(init_str);
- int total_files = dsn_config_get_value_uint64("fds_concurrent_test",
"total_files", 64, "");
+ DSN_DEFINE_int32(fds_concurrent_test, total_files, 64, "");
+ int total_files = FLAGS_total_files;
unsigned long min_size =
dsn_config_get_value_uint64("fds_concurrent_test", "min_size", 64, "");
unsigned long max_size =
dsn_config_get_value_uint64("fds_concurrent_test", "min_size", 64, "");
diff --git a/src/common/replication_common.cpp
b/src/common/replication_common.cpp
index be67dfb99..f2ff1a410 100644
--- a/src/common/replication_common.cpp
+++ b/src/common/replication_common.cpp
@@ -42,6 +42,41 @@ DSN_DEFINE_int32(replication,
5,
"concurrent bulk load downloading replica count");
+DSN_DEFINE_int32(replication,
+ mutation_2pc_min_replica_count,
+ 2,
+ "minimum number of alive replicas under which write is
allowed. it's valid if "
+ "larger than 0, otherwise, the final value is based on
app_max_replica_count");
+DSN_DEFINE_int32(
+ replication,
+ gc_interval_ms,
+ 30 * 1000,
+ "every what period (ms) we do garbage collection for dead replicas,
on-disk state, log, etc.");
+DSN_DEFINE_int32(replication,
+ fd_check_interval_seconds,
+ 2,
+ "every this period(seconds) the FD will check healthness of
remote peers");
+DSN_DEFINE_int32(replication,
+ fd_beacon_interval_seconds,
+ 3,
+ "every this period(seconds) the FD sends beacon message to
remote peers");
+DSN_DEFINE_int32(replication, fd_lease_seconds, 9, "lease (seconds) get from
remote FD master");
+DSN_DEFINE_int32(replication,
+ fd_grace_seconds,
+ 10,
+ "grace (seconds) assigned to remote FD slaves (grace >
lease)");
+
+// TODO(yingchun): useless any more, remove it from all config files later.
+// DSN_DEFINE_int32(replication,
+// log_shared_batch_buffer_kb,
+// 0,
+// "shared log buffer size (KB) for batching incoming
logs");
+
+DSN_DEFINE_int32(replication,
+ cold_backup_checkpoint_reserve_minutes,
+ 10,
+ "reserve minutes of cold backup checkpoint");
+
/**
* Empty write is used for flushing WAL log entry which is submit
asynchronously.
* Make sure it can work well if you diable it.
@@ -60,58 +95,24 @@ replication_options::replication_options()
delay_for_fd_timeout_on_start = false;
duplication_enabled = true;
- prepare_timeout_ms_for_secondaries = 1000;
- prepare_timeout_ms_for_potential_secondaries = 3000;
- prepare_decree_gap_for_debug_logging = 10000;
-
batch_write_disabled = false;
- staleness_for_commit = 10;
- max_mutation_count_in_prepare_list = 110;
- mutation_2pc_min_replica_count = 2;
group_check_disabled = false;
- group_check_interval_ms = 10000;
checkpoint_disabled = false;
- checkpoint_interval_seconds = 100;
checkpoint_min_decree_gap = 10000;
- checkpoint_max_interval_hours = 2;
gc_disabled = false;
- gc_interval_ms = 30 * 1000; // 30 seconds
- gc_memory_replica_interval_ms = 10 * 60 * 1000; // 10 minutes
disk_stat_disabled = false;
- disk_stat_interval_seconds = 600;
fd_disabled = false;
- fd_check_interval_seconds = 2;
- fd_beacon_interval_seconds = 3;
- fd_lease_seconds = 9;
- fd_grace_seconds = 10;
-
- log_private_file_size_mb = 32;
- log_private_reserve_max_size_mb = 0;
- log_private_reserve_max_time_seconds = 0;
-
- log_shared_file_size_mb = 32;
- log_shared_file_count_limit = 100;
- log_shared_batch_buffer_kb = 0;
+
log_shared_force_flush = false;
- log_shared_pending_size_throttling_threshold_kb = 0;
- log_shared_pending_size_throttling_delay_ms = 0;
config_sync_disabled = false;
mem_release_enabled = true;
- mem_release_check_interval_ms = 3600000;
- mem_release_max_reserved_mem_percentage = 10;
-
- lb_interval_ms = 10000;
-
- learn_app_max_concurrent_count = 5;
-
- cold_backup_checkpoint_reserve_minutes = 10;
}
replication_options::~replication_options() {}
@@ -193,167 +194,41 @@ void replication_options::initialize()
duplication_enabled = dsn_config_get_value_bool(
"replication", "duplication_enabled", duplication_enabled, "is
duplication enabled");
- prepare_timeout_ms_for_secondaries = (int)dsn_config_get_value_uint64(
- "replication",
- "prepare_timeout_ms_for_secondaries",
- prepare_timeout_ms_for_secondaries,
- "timeout (ms) for prepare message to secondaries in two phase commit");
- prepare_timeout_ms_for_potential_secondaries =
(int)dsn_config_get_value_uint64(
- "replication",
- "prepare_timeout_ms_for_potential_secondaries",
- prepare_timeout_ms_for_potential_secondaries,
- "timeout (ms) for prepare message to potential secondaries in two
phase commit");
- prepare_decree_gap_for_debug_logging = (int)dsn_config_get_value_uint64(
- "replication",
- "prepare_decree_gap_for_debug_logging",
- prepare_decree_gap_for_debug_logging,
- "if greater than 0, then print debug log every decree gap of
preparing");
-
batch_write_disabled =
dsn_config_get_value_bool("replication",
"batch_write_disabled",
batch_write_disabled,
"whether to disable auto-batch of replicated
write requests");
- staleness_for_commit =
- (int)dsn_config_get_value_uint64("replication",
- "staleness_for_commit",
- staleness_for_commit,
- "how many concurrent two phase commit
rounds are allowed");
- max_mutation_count_in_prepare_list =
- (int)dsn_config_get_value_uint64("replication",
- "max_mutation_count_in_prepare_list",
- max_mutation_count_in_prepare_list,
- "maximum number of mutations in
prepare list");
- mutation_2pc_min_replica_count = (int)dsn_config_get_value_uint64(
- "replication",
- "mutation_2pc_min_replica_count",
- mutation_2pc_min_replica_count,
- "minimum number of alive replicas under which write is allowed. it's
valid if larger than "
- "0, otherwise, the final value is based on app_max_replica_count");
group_check_disabled = dsn_config_get_value_bool("replication",
"group_check_disabled",
group_check_disabled,
"whether group check is
disabled");
- group_check_interval_ms =
- (int)dsn_config_get_value_uint64("replication",
- "group_check_interval_ms",
- group_check_interval_ms,
- "every what period (ms) we check the
replica healthness");
checkpoint_disabled = dsn_config_get_value_bool("replication",
"checkpoint_disabled",
checkpoint_disabled,
"whether checkpoint is
disabled");
- checkpoint_interval_seconds = (int)dsn_config_get_value_uint64(
- "replication",
- "checkpoint_interval_seconds",
- checkpoint_interval_seconds,
- "every what period (seconds) we do checkpoints for replicated apps");
checkpoint_min_decree_gap =
(int64_t)dsn_config_get_value_uint64("replication",
"checkpoint_min_decree_gap",
checkpoint_min_decree_gap,
"minimum decree gap that triggers
checkpoint");
- checkpoint_max_interval_hours = (int)dsn_config_get_value_uint64(
- "replication",
- "checkpoint_max_interval_hours",
- checkpoint_max_interval_hours,
- "maximum time interval (hours) where a new checkpoint must be
created");
gc_disabled = dsn_config_get_value_bool(
"replication", "gc_disabled", gc_disabled, "whether to disable garbage
collection");
- gc_interval_ms = (int)dsn_config_get_value_uint64("replication",
- "gc_interval_ms",
- gc_interval_ms,
- "every what period (ms)
we do garbage "
- "collection for dead
replicas, on-disk "
- "state, log, etc.");
- gc_memory_replica_interval_ms = (int)dsn_config_get_value_uint64(
- "replication",
- "gc_memory_replica_interval_ms",
- gc_memory_replica_interval_ms,
- "after closing a healthy replica (due to LB), the replica will remain
in memory for this "
- "long (ms) for quick recover");
disk_stat_disabled = dsn_config_get_value_bool(
"replication", "disk_stat_disabled", disk_stat_disabled, "whether to
disable disk stat");
- disk_stat_interval_seconds =
- (int)dsn_config_get_value_uint64("replication",
- "disk_stat_interval_seconds",
- disk_stat_interval_seconds,
- "every what period (ms) we do disk
stat");
fd_disabled = dsn_config_get_value_bool(
"replication", "fd_disabled", fd_disabled, "whether to disable failure
detection");
- fd_check_interval_seconds = (int)dsn_config_get_value_uint64(
- "replication",
- "fd_check_interval_seconds",
- fd_check_interval_seconds,
- "every this period(seconds) the FD will check healthness of remote
peers");
- fd_beacon_interval_seconds = (int)dsn_config_get_value_uint64(
- "replication",
- "fd_beacon_interval_seconds",
- fd_beacon_interval_seconds,
- "every this period(seconds) the FD sends beacon message to remote
peers");
- fd_lease_seconds =
- (int)dsn_config_get_value_uint64("replication",
- "fd_lease_seconds",
- fd_lease_seconds,
- "lease (seconds) get from remote FD
master");
- fd_grace_seconds = (int)dsn_config_get_value_uint64(
- "replication",
- "fd_grace_seconds",
- fd_grace_seconds,
- "grace (seconds) assigned to remote FD slaves (grace > lease)");
-
- log_private_file_size_mb =
- (int)dsn_config_get_value_uint64("replication",
- "log_private_file_size_mb",
- log_private_file_size_mb,
- "private log maximum segment file
size (MB)");
- // ATTENTION: only when log_private_reserve_max_size_mb and
log_private_reserve_max_time_seconds
- // are both satisfied, the useless logs can be reserved.
- log_private_reserve_max_size_mb =
- (int)dsn_config_get_value_uint64("replication",
- "log_private_reserve_max_size_mb",
- log_private_reserve_max_size_mb,
- "max size of useless private log to
be reserved");
- log_private_reserve_max_time_seconds = (int)dsn_config_get_value_uint64(
- "replication",
- "log_private_reserve_max_time_seconds",
- log_private_reserve_max_time_seconds,
- "max time in seconds of useless private log to be reserved");
-
- log_shared_file_size_mb =
- (int)dsn_config_get_value_uint64("replication",
- "log_shared_file_size_mb",
- log_shared_file_size_mb,
- "shared log maximum segment file size
(MB)");
- log_shared_file_count_limit =
(int)dsn_config_get_value_uint64("replication",
-
"log_shared_file_count_limit",
-
log_shared_file_count_limit,
- "shared log
maximum file count");
- log_shared_batch_buffer_kb =
- (int)dsn_config_get_value_uint64("replication",
- "log_shared_batch_buffer_kb",
- log_shared_batch_buffer_kb,
- "shared log buffer size (KB) for
batching incoming logs");
+
log_shared_force_flush =
dsn_config_get_value_bool("replication",
"log_shared_force_flush",
log_shared_force_flush,
"when write shared log, whether to flush
file after write done");
- log_shared_pending_size_throttling_threshold_kb =
- (int)dsn_config_get_value_uint64("replication",
-
"log_shared_pending_size_throttling_threshold_kb",
-
log_shared_pending_size_throttling_threshold_kb,
-
"log_shared_pending_size_throttling_threshold_kb");
- log_shared_pending_size_throttling_delay_ms =
- (int)dsn_config_get_value_uint64("replication",
-
"log_shared_pending_size_throttling_delay_ms",
-
log_shared_pending_size_throttling_delay_ms,
-
"log_shared_pending_size_throttling_delay_ms");
config_sync_disabled = dsn_config_get_value_bool(
"replication",
@@ -366,57 +241,19 @@ void replication_options::initialize()
mem_release_enabled,
"whether to enable
periodic memory release");
- mem_release_check_interval_ms = (int)dsn_config_get_value_uint64(
- "replication",
- "mem_release_check_interval_ms",
- mem_release_check_interval_ms,
- "the replica check if should release memory to the system every this
period of time(ms)");
-
- mem_release_max_reserved_mem_percentage = (int)dsn_config_get_value_uint64(
- "replication",
- "mem_release_max_reserved_mem_percentage",
- mem_release_max_reserved_mem_percentage,
- "if tcmalloc reserved but not-used memory exceed this percentage of
application allocated "
- "memory, replica server will release the exceeding memory back to
operating system");
-
- lb_interval_ms = (int)dsn_config_get_value_uint64(
- "replication",
- "lb_interval_ms",
- lb_interval_ms,
- "every this period(ms) the meta server will do load balance");
-
- learn_app_max_concurrent_count =
- (int)dsn_config_get_value_uint64("replication",
- "learn_app_max_concurrent_count",
- learn_app_max_concurrent_count,
- "max count of learning app
concurrently");
-
cold_backup_root = dsn_config_get_value_string(
"replication", "cold_backup_root", "", "cold backup remote storage
path prefix");
- cold_backup_checkpoint_reserve_minutes =
- (int)dsn_config_get_value_uint64("replication",
-
"cold_backup_checkpoint_reserve_minutes",
-
cold_backup_checkpoint_reserve_minutes,
- "reserve minutes of cold backup
checkpoint");
-
max_concurrent_bulk_load_downloading_count =
FLAGS_max_concurrent_bulk_load_downloading_count;
CHECK(replica_helper::load_meta_servers(meta_servers), "invalid meta
server config");
-
- sanity_check();
-}
-
-void replication_options::sanity_check()
-{
- CHECK_GE(max_mutation_count_in_prepare_list, staleness_for_commit);
}
int32_t replication_options::app_mutation_2pc_min_replica_count(int32_t
app_max_replica_count) const
{
CHECK_GT(app_max_replica_count, 0);
- if (mutation_2pc_min_replica_count > 0) { // >0 means use the user config
- return mutation_2pc_min_replica_count;
+ if (FLAGS_mutation_2pc_min_replica_count > 0) { // >0 means use the user
config
+ return FLAGS_mutation_2pc_min_replica_count;
} else { // otherwise, the value based on the table max_replica_count
return app_max_replica_count <= 2 ? 1 : app_max_replica_count / 2 + 1;
}
diff --git a/src/common/replication_common.h b/src/common/replication_common.h
index 29c72826a..970ad3a45 100644
--- a/src/common/replication_common.h
+++ b/src/common/replication_common.h
@@ -71,59 +71,26 @@ public:
bool delay_for_fd_timeout_on_start;
bool duplication_enabled;
- int32_t prepare_timeout_ms_for_secondaries;
- int32_t prepare_timeout_ms_for_potential_secondaries;
- int32_t prepare_decree_gap_for_debug_logging;
-
bool batch_write_disabled;
- int32_t staleness_for_commit;
- int32_t max_mutation_count_in_prepare_list;
- int32_t mutation_2pc_min_replica_count;
bool group_check_disabled;
- int32_t group_check_interval_ms;
bool checkpoint_disabled;
- int32_t checkpoint_interval_seconds;
int64_t checkpoint_min_decree_gap;
- int32_t checkpoint_max_interval_hours;
bool gc_disabled;
- int32_t gc_interval_ms;
- int32_t gc_memory_replica_interval_ms;
bool disk_stat_disabled;
- int32_t disk_stat_interval_seconds;
bool fd_disabled;
- int32_t fd_check_interval_seconds;
- int32_t fd_beacon_interval_seconds;
- int32_t fd_lease_seconds;
- int32_t fd_grace_seconds;
-
- int32_t log_private_file_size_mb;
- int32_t log_private_reserve_max_size_mb;
- int32_t log_private_reserve_max_time_seconds;
-
- int32_t log_shared_file_size_mb;
- int32_t log_shared_file_count_limit;
- int32_t log_shared_batch_buffer_kb;
+
bool log_shared_force_flush;
- int32_t log_shared_pending_size_throttling_threshold_kb;
- int32_t log_shared_pending_size_throttling_delay_ms;
bool config_sync_disabled;
bool mem_release_enabled;
- int32_t mem_release_check_interval_ms;
- int32_t mem_release_max_reserved_mem_percentage;
-
- int32_t lb_interval_ms;
-
- int32_t learn_app_max_concurrent_count;
std::string cold_backup_root;
- int32_t cold_backup_checkpoint_reserve_minutes;
int32_t max_concurrent_bulk_load_downloading_count;
@@ -143,9 +110,6 @@ public:
/*out*/ std::vector<std::string>
&dirs);
static bool check_if_in_black_list(const std::vector<std::string>
&black_list_dir,
const std::string &dir);
-
-private:
- void sanity_check();
};
} // namespace replication
} // namespace dsn
diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp
index f51778c97..c9448b40a 100644
--- a/src/meta/meta_backup_service.cpp
+++ b/src/meta/meta_backup_service.cpp
@@ -30,6 +30,9 @@
namespace dsn {
namespace replication {
+DSN_DECLARE_int32(cold_backup_checkpoint_reserve_minutes);
+DSN_DECLARE_int32(fd_lease_seconds);
+
// TODO: backup_service and policy_context should need two locks, its own
_lock and server_state's
// _lock this maybe lead to deadlock, should refactor this
@@ -1227,12 +1230,11 @@ void backup_service::add_backup_policy(dsn::message_ex
*msg)
// The backup interval must be greater than checkpoint reserve time.
// Or the next cold backup checkpoint may be cleared by the clear
operation.
- if (request.backup_interval_seconds <=
- _meta_svc->get_options().cold_backup_checkpoint_reserve_minutes * 60) {
+ if (request.backup_interval_seconds <=
FLAGS_cold_backup_checkpoint_reserve_minutes * 60) {
response.err = ERR_INVALID_PARAMETERS;
response.hint_message = fmt::format(
- "backup interval must be greater than
cold_backup_checkpoint_reserve_minutes={}",
- _meta_svc->get_options().cold_backup_checkpoint_reserve_minutes);
+ "backup interval must be greater than
FLAGS_cold_backup_checkpoint_reserve_minutes={}",
+ FLAGS_cold_backup_checkpoint_reserve_minutes);
_meta_svc->reply_data(msg, response);
msg->release_ref();
return;
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index 3c2077d9f..11d6a4a77 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -58,6 +58,16 @@ DSN_TAG_VARIABLE(min_live_node_count_for_unfreeze,
FT_MUTABLE);
DSN_DEFINE_validator(min_live_node_count_for_unfreeze,
[](uint64_t min_live_node_count) -> bool { return
min_live_node_count > 0; });
+DSN_DEFINE_int32(replication,
+ lb_interval_ms,
+ 10000,
+ "every this period(ms) the meta server will do load balance");
+
+DSN_DECLARE_int32(fd_beacon_interval_seconds);
+DSN_DECLARE_int32(fd_check_interval_seconds);
+DSN_DECLARE_int32(fd_grace_seconds);
+DSN_DECLARE_int32(fd_lease_seconds);
+
meta_service::meta_service()
: serverlet("meta_service"), _failure_detector(nullptr), _started(false),
_recovering(false)
{
@@ -272,9 +282,9 @@ void meta_service::start_service()
tasking::enqueue_timer(LPC_META_STATE_NORMAL,
nullptr,
std::bind(&meta_service::balancer_run, this),
- std::chrono::milliseconds(_opts.lb_interval_ms),
+ std::chrono::milliseconds(FLAGS_lb_interval_ms),
server_state::sStateHash,
- std::chrono::milliseconds(_opts.lb_interval_ms));
+ std::chrono::milliseconds(FLAGS_lb_interval_ms));
if (!_meta_opts.cold_backup_disabled) {
LOG_INFO("start backup service");
@@ -309,10 +319,10 @@ error_code meta_service::start()
_failure_detector->set_allow_list(_meta_opts.replica_white_list);
_failure_detector->register_ctrl_commands();
- err = _failure_detector->start(_opts.fd_check_interval_seconds,
- _opts.fd_beacon_interval_seconds,
- _opts.fd_lease_seconds,
- _opts.fd_grace_seconds,
+ err = _failure_detector->start(FLAGS_fd_check_interval_seconds,
+ FLAGS_fd_beacon_interval_seconds,
+ FLAGS_fd_lease_seconds,
+ FLAGS_fd_grace_seconds,
_meta_opts.enable_white_list);
dreturn_not_ok_logged(err, "start failure_detector failed, err = {}", err);
diff --git a/src/meta/test/backup_test.cpp b/src/meta/test/backup_test.cpp
index 2792df524..0bba049cd 100644
--- a/src/meta/test/backup_test.cpp
+++ b/src/meta/test/backup_test.cpp
@@ -41,6 +41,8 @@
namespace dsn {
namespace replication {
+DSN_DECLARE_int32(cold_backup_checkpoint_reserve_minutes);
+
struct method_record
{
dsn::utils::notify_event event;
@@ -793,8 +795,8 @@ TEST_F(meta_backup_service_test, test_add_backup_policy)
fake_wait_rpc(r, resp);
std::string hint_message = fmt::format(
- "backup interval must be greater than
cold_backup_checkpoint_reserve_minutes={}",
- _meta_svc->get_options().cold_backup_checkpoint_reserve_minutes);
+ "backup interval must be greater than
FLAGS_cold_backup_checkpoint_reserve_minutes={}",
+ FLAGS_cold_backup_checkpoint_reserve_minutes);
ASSERT_EQ(ERR_INVALID_PARAMETERS, resp.err);
ASSERT_EQ(hint_message, resp.hint_message);
req.backup_interval_seconds = old_backup_interval_seconds;
diff --git a/src/replica/backup/replica_backup_manager.cpp
b/src/replica/backup/replica_backup_manager.cpp
index 1ab5348de..187e0187c 100644
--- a/src/replica/backup/replica_backup_manager.cpp
+++ b/src/replica/backup/replica_backup_manager.cpp
@@ -21,11 +21,15 @@
#include "utils/fmt_logging.h"
#include "utils/filesystem.h"
+#include "utils/flags.h"
#include "replica/replication_app_base.h"
namespace dsn {
namespace replication {
+DSN_DECLARE_int32(cold_backup_checkpoint_reserve_minutes);
+DSN_DECLARE_int32(gc_interval_ms);
+
// returns true if this checkpoint dir belongs to the policy
static bool is_policy_checkpoint(const std::string &chkpt_dirname, const
std::string &policy_name)
{
@@ -101,7 +105,7 @@ void replica_backup_manager::start_collect_backup_info()
tasking::enqueue_timer(LPC_PER_REPLICA_COLLECT_INFO_TIMER,
&_replica->_tracker,
[this]() { collect_backup_info(); },
-
std::chrono::milliseconds(_replica->options()->gc_interval_ms),
+
std::chrono::milliseconds(FLAGS_gc_interval_ms),
get_gpid().thread_hash());
}
}
@@ -145,13 +149,12 @@ void
replica_backup_manager::background_clear_backup_checkpoint(const std::strin
{
LOG_INFO_PREFIX("schedule to clear all checkpoint dirs of policy({}) after
{} minutes",
policy_name,
-
_replica->options()->cold_backup_checkpoint_reserve_minutes);
- tasking::enqueue(
- LPC_BACKGROUND_COLD_BACKUP,
- &_replica->_tracker,
- [this, policy_name]() { clear_backup_checkpoint(policy_name); },
- get_gpid().thread_hash(),
-
std::chrono::minutes(_replica->options()->cold_backup_checkpoint_reserve_minutes));
+ FLAGS_cold_backup_checkpoint_reserve_minutes);
+ tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP,
+ &_replica->_tracker,
+ [this, policy_name]() {
clear_backup_checkpoint(policy_name); },
+ get_gpid().thread_hash(),
+
std::chrono::minutes(FLAGS_cold_backup_checkpoint_reserve_minutes));
}
// clear all checkpoint dirs of the policy
diff --git a/src/replica/mutation_cache.h b/src/replica/mutation_cache.h
index de5461c9b..57ebec6f4 100644
--- a/src/replica/mutation_cache.h
+++ b/src/replica/mutation_cache.h
@@ -35,7 +35,7 @@ namespace dsn {
namespace replication {
// mutation_cache is an in-memory array that stores a limited number
-// (SEE replication_options::max_mutation_count_in_prepare_list) of mutation
log entries.
+// (SEE FLAGS_max_mutation_count_in_prepare_list) of mutation log entries.
//
// Inherited by: prepare_list
class mutation_cache
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index 0d2df6b63..d19950735 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -51,6 +51,27 @@
namespace dsn {
namespace replication {
+DSN_DEFINE_int32(replication,
+ staleness_for_commit,
+ 10,
+ "how many concurrent two phase commit rounds are allowed");
+DSN_DEFINE_int32(replication,
+ max_mutation_count_in_prepare_list,
+ 110,
+ "maximum number of mutations in prepare list");
+DSN_DEFINE_group_validator(max_mutation_count_in_prepare_list, [](std::string
&message) -> bool {
+ if (FLAGS_max_mutation_count_in_prepare_list < FLAGS_staleness_for_commit)
{
+ message =
fmt::format("replication.max_mutation_count_in_prepare_list({}) should be >= "
+ "replication.staleness_for_commit({})",
+ FLAGS_max_mutation_count_in_prepare_list,
+ FLAGS_staleness_for_commit);
+ return false;
+ }
+ return true;
+});
+
+DSN_DECLARE_int32(checkpoint_max_interval_hours);
+
const std::string replica::kAppInfo = ".app-info";
replica::replica(replica_stub *stub,
@@ -62,8 +83,7 @@ replica::replica(replica_stub *stub,
: serverlet<replica>("replica"),
replica_base(gpid, fmt::format("{}@{}", gpid,
stub->_primary_address_str), app.app_name),
_app_info(app),
- _primary_states(
- gpid, stub->options().staleness_for_commit,
stub->options().batch_write_disabled),
+ _primary_states(gpid, FLAGS_staleness_for_commit,
stub->options().batch_write_disabled),
_potential_secondary_states(this),
_cold_backup_running_count(0),
_cold_backup_max_duration_time_ms(0),
@@ -155,7 +175,7 @@ replica::replica(replica_stub *stub,
void replica::update_last_checkpoint_generate_time()
{
_last_checkpoint_generate_time_ms = dsn_now_ms();
- uint64_t max_interval_ms = _options->checkpoint_max_interval_hours *
3600000UL;
+ uint64_t max_interval_ms = FLAGS_checkpoint_max_interval_hours * 3600000UL;
// use random trigger time to avoid flush peek
_next_checkpoint_interval_trigger_time_ms =
_last_checkpoint_generate_time_ms + rand::next_u64(max_interval_ms /
2, max_interval_ms);
@@ -177,7 +197,7 @@ void replica::init_state()
_prepare_list = dsn::make_unique<prepare_list>(
this,
0,
- _options->max_mutation_count_in_prepare_list,
+ FLAGS_max_mutation_count_in_prepare_list,
std::bind(&replica::execute_mutation, this, std::placeholders::_1));
_config.ballot = 0;
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index 78ae1984b..3d97fd400 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -44,6 +44,30 @@ DSN_DEFINE_bool(replication,
"reject client write requests if disk status is space
insufficient");
DSN_TAG_VARIABLE(reject_write_when_disk_insufficient, FT_MUTABLE);
+DSN_DEFINE_int32(replication,
+ prepare_timeout_ms_for_secondaries,
+ 1000,
+ "timeout (ms) for prepare message to secondaries in two phase
commit");
+DSN_DEFINE_int32(replication,
+ prepare_timeout_ms_for_potential_secondaries,
+ 3000,
+ "timeout (ms) for prepare message to potential secondaries in
two phase commit");
+DSN_DEFINE_int32(replication,
+ prepare_decree_gap_for_debug_logging,
+ 10000,
+ "if greater than 0, then print debug log every decree gap of
preparing");
+DSN_DEFINE_int32(replication,
+ log_shared_pending_size_throttling_threshold_kb,
+ 0,
+ "log_shared_pending_size_throttling_threshold_kb");
+DSN_DEFINE_int32(replication,
+ log_shared_pending_size_throttling_delay_ms,
+ 0,
+ "log_shared_pending_size_throttling_delay_ms");
+
+DSN_DECLARE_int32(max_mutation_count_in_prepare_list);
+DSN_DECLARE_int32(staleness_for_commit);
+
void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
{
_checker.only_one_thread_access();
@@ -177,8 +201,8 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
if (mu->data.header.decree == invalid_decree) {
mu->set_id(get_ballot(), _prepare_list->max_decree() + 1);
// print a debug log if necessary
- if (_options->prepare_decree_gap_for_debug_logging > 0 &&
- mu->get_decree() % _options->prepare_decree_gap_for_debug_logging
== 0)
+ if (FLAGS_prepare_decree_gap_for_debug_logging > 0 &&
+ mu->get_decree() % FLAGS_prepare_decree_gap_for_debug_logging == 0)
level = LOG_LEVEL_INFO;
mu->set_timestamp(_uniq_timestamp_us.next());
} else {
@@ -196,7 +220,7 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
mu->set_is_sync_to_child(_primary_states.sync_send_write_request);
// check bounded staleness
- if (mu->data.header.decree > last_committed_decree() +
_options->staleness_for_commit) {
+ if (mu->data.header.decree > last_committed_decree() +
FLAGS_staleness_for_commit) {
err = ERR_CAPACITY_EXCEEDED;
goto ErrOut;
}
@@ -245,7 +269,7 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
send_prepare_message(*it,
partition_status::PS_SECONDARY,
mu,
- _options->prepare_timeout_ms_for_secondaries,
+ FLAGS_prepare_timeout_ms_for_secondaries,
pop_all_committed_mutations);
}
@@ -256,7 +280,7 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
send_prepare_message(it->first,
partition_status::PS_POTENTIAL_SECONDARY,
mu,
-
_options->prepare_timeout_ms_for_potential_secondaries,
+
FLAGS_prepare_timeout_ms_for_potential_secondaries,
pop_all_committed_mutations,
it->second.signature);
count++;
@@ -285,10 +309,10 @@ void replica::init_prepare(mutation_ptr &mu, bool
reconciliation, bool pop_all_c
get_gpid().thread_hash(),
&pending_size);
CHECK_NOTNULL(mu->log_task(), "");
- if (_options->log_shared_pending_size_throttling_threshold_kb > 0 &&
- _options->log_shared_pending_size_throttling_delay_ms > 0 &&
- pending_size >=
_options->log_shared_pending_size_throttling_threshold_kb * 1024) {
- int delay_ms =
_options->log_shared_pending_size_throttling_delay_ms;
+ if (FLAGS_log_shared_pending_size_throttling_threshold_kb > 0 &&
+ FLAGS_log_shared_pending_size_throttling_delay_ms > 0 &&
+ pending_size >=
FLAGS_log_shared_pending_size_throttling_threshold_kb * 1024) {
+ int delay_ms = FLAGS_log_shared_pending_size_throttling_delay_ms;
for (dsn::message_ex *r : mu->client_requests) {
if (r && r->io_session->delay_recv(delay_ms)) {
LOG_WARNING("too large pending shared log ({}), delay
traffic from {} for {} "
@@ -477,10 +501,10 @@ void replica::on_prepare(dsn::message_ex *request)
if (partition_status::PS_POTENTIAL_SECONDARY == status() ||
partition_status::PS_SECONDARY == status()) {
CHECK_LE_MSG(mu->data.header.decree,
- last_committed_decree() +
_options->max_mutation_count_in_prepare_list,
- "last_committed_decree: {},
_options->max_mutation_count_in_prepare_list: {}",
+ last_committed_decree() +
FLAGS_max_mutation_count_in_prepare_list,
+ "last_committed_decree: {},
FLAGS_max_mutation_count_in_prepare_list: {}",
last_committed_decree(),
- _options->max_mutation_count_in_prepare_list);
+ FLAGS_max_mutation_count_in_prepare_list);
} else {
LOG_ERROR_PREFIX("mutation {} on_prepare failed as invalid replica
state, state = {}",
mu->name(),
@@ -646,8 +670,8 @@ void replica::on_prepare_reply(std::pair<mutation_ptr,
partition_status::type> p
// retry for INACTIVE or TRY_AGAIN if there is still time.
if (resp.err == ERR_INACTIVE_STATE || resp.err == ERR_TRY_AGAIN) {
int prepare_timeout_ms = (target_status ==
partition_status::PS_SECONDARY
- ?
_options->prepare_timeout_ms_for_secondaries
- :
_options->prepare_timeout_ms_for_potential_secondaries);
+ ?
FLAGS_prepare_timeout_ms_for_secondaries
+ :
FLAGS_prepare_timeout_ms_for_potential_secondaries);
int delay_time_ms = 5; // delay some time before retry to avoid
sending too frequently
if (mu->is_prepare_close_to_timeout(delay_time_ms + 2,
prepare_timeout_ms)) {
LOG_ERROR_PREFIX("mutation {} do not retry prepare to {} for
no enought time left, "
diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp
index 8b794c74d..7329eb7d6 100644
--- a/src/replica/replica_check.cpp
+++ b/src/replica/replica_check.cpp
@@ -47,6 +47,11 @@
namespace dsn {
namespace replication {
+DSN_DEFINE_int32(replication,
+ group_check_interval_ms,
+ 10000,
+ "every what period (ms) we check the replica healthness");
+
DSN_DECLARE_bool(empty_write_disabled);
void replica::init_group_check()
@@ -65,7 +70,7 @@ void replica::init_group_check()
tasking::enqueue_timer(LPC_GROUP_CHECK,
&_tracker,
[this] { broadcast_group_check(); },
-
std::chrono::milliseconds(_options->group_check_interval_ms),
+
std::chrono::milliseconds(FLAGS_group_check_interval_ms),
get_gpid().thread_hash());
}
@@ -136,7 +141,7 @@ void replica::broadcast_group_check()
// send empty prepare when necessary
if (!FLAGS_empty_write_disabled &&
- dsn_now_ms() >= _primary_states.last_prepare_ts_ms +
_options->group_check_interval_ms) {
+ dsn_now_ms() >= _primary_states.last_prepare_ts_ms +
FLAGS_group_check_interval_ms) {
mutation_ptr mu = new_mutation(invalid_decree);
mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
init_prepare(mu, false);
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index 02330238c..978eb3e76 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -48,6 +48,25 @@
namespace dsn {
namespace replication {
+DSN_DEFINE_int32(replication,
+ checkpoint_max_interval_hours,
+ 2,
+ "maximum time interval (hours) where a new checkpoint must be
created");
+DSN_DEFINE_int32(replication,
+ log_private_reserve_max_size_mb,
+ 0,
+ "max size of useless private log to be reserved. NOTE: only
when "
+ "FLAGS_log_private_reserve_max_size_mb and "
+ "FLAGS_log_private_reserve_max_time_seconds are both
satisfied, the useless logs "
+ "can be reserved.");
+DSN_DEFINE_int32(replication,
+ log_private_reserve_max_time_seconds,
+ 0,
+ "max time in seconds of useless private log to be reserved.
NOTE: only when "
+ "FLAGS_log_private_reserve_max_size_mb and "
+ "FLAGS_log_private_reserve_max_time_seconds are both
satisfied, the useless logs "
+ "can be reserved.");
+
const std::string kCheckpointFolderPrefix /*NOLINT*/ = "checkpoint";
static std::string checkpoint_folder(int64_t decree)
@@ -62,10 +81,10 @@ void replica::on_checkpoint_timer()
if (dsn_now_ms() > _next_checkpoint_interval_trigger_time_ms) {
// we trigger emergency checkpoint if no checkpoint generated for a
long time
- LOG_INFO_PREFIX("trigger emergency checkpoint by
checkpoint_max_interval_hours, "
+ LOG_INFO_PREFIX("trigger emergency checkpoint by
FLAGS_checkpoint_max_interval_hours, "
"config_interval = {}h ({}ms), random_interval = {}ms",
- _options->checkpoint_max_interval_hours,
- _options->checkpoint_max_interval_hours * 3600000UL,
+ FLAGS_checkpoint_max_interval_hours,
+ FLAGS_checkpoint_max_interval_hours * 3600000UL,
_next_checkpoint_interval_trigger_time_ms -
_last_checkpoint_generate_time_ms);
init_checkpoint(true);
@@ -120,8 +139,8 @@ void replica::on_checkpoint_timer()
get_gpid(),
cleanable_decree,
valid_start_offset,
-
(int64_t)_options->log_private_reserve_max_size_mb * 1024 * 1024,
-
(int64_t)_options->log_private_reserve_max_time_seconds);
+
(int64_t)FLAGS_log_private_reserve_max_size_mb * 1024 * 1024,
+
(int64_t)FLAGS_log_private_reserve_max_time_seconds);
if (status() == partition_status::PS_PRIMARY)
_counter_private_log_size->set(_private_log->total_size() /
1000000);
diff --git a/src/replica/replica_init.cpp b/src/replica/replica_init.cpp
index 8f726b535..80ef47e35 100644
--- a/src/replica/replica_init.cpp
+++ b/src/replica/replica_init.cpp
@@ -39,6 +39,16 @@
namespace dsn {
namespace replication {
+DSN_DEFINE_int32(replication,
+ checkpoint_interval_seconds,
+ 100,
+ "every what period (seconds) we do checkpoints for replicated
apps");
+
+DSN_DEFINE_int32(replication,
+ log_private_file_size_mb,
+ 32,
+ "private log maximum segment file size (MB)");
+
error_code replica::initialize_on_new()
{
// if (dsn::utils::filesystem::directory_exists(_dir) &&
@@ -245,8 +255,8 @@ error_code replica::init_app_and_prepare_list(bool
create_new)
_config.ballot = _app->init_info().init_ballot;
_prepare_list->reset(_app->last_committed_decree());
- _private_log = new mutation_log_private(
- log_dir, _options->log_private_file_size_mb, get_gpid(), this);
+ _private_log =
+ new mutation_log_private(log_dir,
FLAGS_log_private_file_size_mb, get_gpid(), this);
LOG_INFO_PREFIX("plog_dir = {}", log_dir);
// sync valid_start_offset between app and logs
@@ -331,8 +341,8 @@ error_code replica::init_app_and_prepare_list(bool
create_new)
"Fail to create directory {}",
log_dir);
- _private_log = new mutation_log_private(
- log_dir, _options->log_private_file_size_mb, get_gpid(), this);
+ _private_log =
+ new mutation_log_private(log_dir,
FLAGS_log_private_file_size_mb, get_gpid(), this);
LOG_INFO_PREFIX("plog_dir = {}", log_dir);
err = _private_log->open(nullptr, [this](error_code err) {
@@ -345,12 +355,12 @@ error_code replica::init_app_and_prepare_list(bool
create_new)
if (err == ERR_OK) {
if (_checkpoint_timer == nullptr &&
!_options->checkpoint_disabled) {
- _checkpoint_timer = tasking::enqueue_timer(
- LPC_PER_REPLICA_CHECKPOINT_TIMER,
- &_tracker,
- [this] { on_checkpoint_timer(); },
-
std::chrono::seconds(_options->checkpoint_interval_seconds),
- get_gpid().thread_hash());
+ _checkpoint_timer =
+ tasking::enqueue_timer(LPC_PER_REPLICA_CHECKPOINT_TIMER,
+ &_tracker,
+ [this] { on_checkpoint_timer(); },
+
std::chrono::seconds(FLAGS_checkpoint_interval_seconds),
+ get_gpid().thread_hash());
}
_backup_mgr->start_collect_backup_info();
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index 8b83079a4..2465632ca 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -46,6 +46,13 @@
namespace dsn {
namespace replication {
+DSN_DEFINE_int32(replication,
+ learn_app_max_concurrent_count,
+ 5,
+ "max count of learning app concurrently");
+
+DSN_DECLARE_int32(max_mutation_count_in_prepare_list);
+
void replica::init_learn(uint64_t signature)
{
_checker.only_one_thread_access();
@@ -172,16 +179,16 @@ void replica::init_learn(uint64_t signature)
}
if (_app->last_committed_decree() == 0 &&
- _stub->_learn_app_concurrent_count.load() >=
_options->learn_app_max_concurrent_count) {
+ _stub->_learn_app_concurrent_count.load() >=
FLAGS_learn_app_max_concurrent_count) {
LOG_WARNING_PREFIX(
"init_learn[{:#018x}]: learnee = {}, learn_duration = {} ms, need
to learn app "
"because app_committed_decree = 0, but
learn_app_concurrent_count({}) >= "
- "learn_app_max_concurrent_count({}), skip",
+ "FLAGS_learn_app_max_concurrent_count({}), skip",
_potential_secondary_states.learning_version,
_config.primary,
_potential_secondary_states.duration_ms(),
_stub->_learn_app_concurrent_count,
- _options->learn_app_max_concurrent_count);
+ FLAGS_learn_app_max_concurrent_count);
return;
}
@@ -674,15 +681,15 @@ void replica::on_learn_reply(error_code err,
learn_request &&req, learn_response
}
if (resp.type == learn_type::LT_APP) {
- if (++_stub->_learn_app_concurrent_count >
_options->learn_app_max_concurrent_count) {
+ if (++_stub->_learn_app_concurrent_count >
FLAGS_learn_app_max_concurrent_count) {
--_stub->_learn_app_concurrent_count;
LOG_WARNING_PREFIX(
"on_learn_reply[{:#018x}]: learnee = {},
learn_app_concurrent_count({}) >= "
- "learn_app_max_concurrent_count({}), skip this round",
+ "FLAGS_learn_app_max_concurrent_count({}), skip this round",
_potential_secondary_states.learning_version,
_config.primary,
_stub->_learn_app_concurrent_count,
- _options->learn_app_max_concurrent_count);
+ FLAGS_learn_app_max_concurrent_count);
_potential_secondary_states.learning_round_is_running = false;
return;
} else {
@@ -1443,7 +1450,7 @@ error_code
replica::apply_learned_state_from_private_log(learn_state &state)
// temp prepare list for learning purpose
prepare_list plist(this,
_app->last_committed_decree(),
- _options->max_mutation_count_in_prepare_list,
+ FLAGS_max_mutation_count_in_prepare_list,
[this, duplicating, step_back](mutation_ptr &mu) {
if (mu->data.header.decree ==
_app->last_committed_decree() + 1) {
// TODO: assign the returned error_code to err
and check it
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 06d4eb411..80300a987 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -85,6 +85,39 @@ DSN_DEFINE_uint32(
DSN_TAG_VARIABLE(config_sync_interval_ms, FT_MUTABLE);
DSN_DEFINE_validator(config_sync_interval_ms, [](uint32_t value) -> bool {
return value > 0; });
+DSN_DEFINE_int32(replication,
+ disk_stat_interval_seconds,
+ 600,
+ "every what period (ms) we do disk stat");
+DSN_DEFINE_int32(replication,
+ gc_memory_replica_interval_ms,
+ 10 * 60 * 1000,
+ "after closing a healthy replica (due to LB), the replica
will remain in memory "
+ "for this long (ms) for quick recover");
+DSN_DEFINE_int32(replication,
+ log_shared_file_size_mb,
+ 32,
+ "shared log maximum segment file size (MB)");
+
+DSN_DEFINE_int32(replication, log_shared_file_count_limit, 100, "shared log
maximum file count");
+DSN_DEFINE_int32(
+ replication,
+ mem_release_check_interval_ms,
+ 3600000,
+ "the replica check if should release memory to the system every this
period of time(ms)");
+DSN_DEFINE_int32(
+ replication,
+ mem_release_max_reserved_mem_percentage,
+ 10,
+ "if tcmalloc reserved but not-used memory exceed this percentage of
application allocated "
+ "memory, replica server will release the exceeding memory back to
operating system");
+
+DSN_DECLARE_int32(fd_beacon_interval_seconds);
+DSN_DECLARE_int32(fd_check_interval_seconds);
+DSN_DECLARE_int32(fd_grace_seconds);
+DSN_DECLARE_int32(fd_lease_seconds);
+DSN_DECLARE_int32(gc_interval_ms);
+
bool replica_stub::s_not_exit_on_log_failure = false;
replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
@@ -495,7 +528,7 @@ void replica_stub::initialize(const replication_options
&opts, bool clear /* = f
_verbose_client_log = _options.verbose_client_log_on_start;
_verbose_commit_log = _options.verbose_commit_log_on_start;
_release_tcmalloc_memory = _options.mem_release_enabled;
- _mem_release_max_reserved_mem_percentage =
_options.mem_release_max_reserved_mem_percentage;
+ _mem_release_max_reserved_mem_percentage =
FLAGS_mem_release_max_reserved_mem_percentage;
_max_concurrent_bulk_load_downloading_count =
_options.max_concurrent_bulk_load_downloading_count;
@@ -518,7 +551,7 @@ void replica_stub::initialize(const replication_options
&opts, bool clear /* = f
initialize_fs_manager(_options.data_dirs, _options.data_dir_tags);
_log = new mutation_log_shared(_options.slog_dir,
- _options.log_shared_file_size_mb,
+ FLAGS_log_shared_file_size_mb,
_options.log_shared_force_flush,
&_counter_shared_log_recent_write_size);
LOG_INFO("slog_dir = {}", _options.slog_dir);
@@ -645,7 +678,7 @@ void replica_stub::initialize(const replication_options
&opts, bool clear /* = f
"remove directory {} failed",
_options.slog_dir);
_log = new mutation_log_shared(_options.slog_dir,
- _options.log_shared_file_size_mb,
+ FLAGS_log_shared_file_size_mb,
_options.log_shared_force_flush,
&_counter_shared_log_recent_write_size);
CHECK_EQ_MSG(_log->open(nullptr, [this](error_code err) {
this->handle_log_failure(err); }),
@@ -698,20 +731,20 @@ void replica_stub::initialize(const replication_options
&opts, bool clear /* = f
LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
&_tracker,
[this] { on_gc(); },
- std::chrono::milliseconds(_options.gc_interval_ms),
+ std::chrono::milliseconds(FLAGS_gc_interval_ms),
0,
- std::chrono::milliseconds(rand::next_u32(0,
_options.gc_interval_ms)));
+ std::chrono::milliseconds(rand::next_u32(0,
FLAGS_gc_interval_ms)));
}
// disk stat
if (false == _options.disk_stat_disabled) {
- _disk_stat_timer_task = ::dsn::tasking::enqueue_timer(
- LPC_DISK_STAT,
- &_tracker,
- [this]() { on_disk_stat(); },
- std::chrono::seconds(_options.disk_stat_interval_seconds),
- 0,
- std::chrono::seconds(_options.disk_stat_interval_seconds));
+ _disk_stat_timer_task =
+ ::dsn::tasking::enqueue_timer(LPC_DISK_STAT,
+ &_tracker,
+ [this]() { on_disk_stat(); },
+
std::chrono::seconds(FLAGS_disk_stat_interval_seconds),
+ 0,
+
std::chrono::seconds(FLAGS_disk_stat_interval_seconds));
}
// attach rps
@@ -729,7 +762,7 @@ void replica_stub::initialize(const replication_options
&opts, bool clear /* = f
if (_options.delay_for_fd_timeout_on_start) {
uint64_t now_time_ms = dsn_now_ms();
uint64_t delay_time_ms =
- (_options.fd_grace_seconds + 3) * 1000; // for more 3 seconds than
grace seconds
+ (FLAGS_fd_grace_seconds + 3) * 1000; // for more 3 seconds than
grace seconds
if (now_time_ms < dsn::utils::process_start_millis() + delay_time_ms) {
uint64_t delay = dsn::utils::process_start_millis() +
delay_time_ms - now_time_ms;
LOG_INFO("delay for {} ms to make failure detector timeout",
delay);
@@ -803,9 +836,9 @@ void replica_stub::initialize_start()
tasking::enqueue_timer(LPC_MEM_RELEASE,
&_tracker,
std::bind(&replica_stub::gc_tcmalloc_memory,
this, false),
-
std::chrono::milliseconds(_options.mem_release_check_interval_ms),
+
std::chrono::milliseconds(FLAGS_mem_release_check_interval_ms),
0,
-
std::chrono::milliseconds(_options.mem_release_check_interval_ms));
+
std::chrono::milliseconds(FLAGS_mem_release_check_interval_ms));
#endif
if (_options.duplication_enabled) {
@@ -823,10 +856,10 @@ void replica_stub::initialize_start()
[this]() { this->on_meta_server_disconnected(); },
[this]() { this->on_meta_server_connected(); });
-
CHECK_EQ_MSG(_failure_detector->start(_options.fd_check_interval_seconds,
-
_options.fd_beacon_interval_seconds,
- _options.fd_lease_seconds,
- _options.fd_grace_seconds),
+ CHECK_EQ_MSG(_failure_detector->start(FLAGS_fd_check_interval_seconds,
+ FLAGS_fd_beacon_interval_seconds,
+ FLAGS_fd_lease_seconds,
+ FLAGS_fd_grace_seconds),
ERR_OK,
"FD start failed");
@@ -1534,7 +1567,7 @@ void
replica_stub::on_node_query_reply_scatter2(replica_stub_ptr this_, gpid id)
replica->status() != partition_status::PS_PARTITION_SPLIT) {
if (replica->status() == partition_status::PS_INACTIVE &&
dsn_now_ms() - replica->create_time_milliseconds() <
- _options.gc_memory_replica_interval_ms) {
+ FLAGS_gc_memory_replica_interval_ms) {
LOG_INFO("{}: replica not exists on meta server, wait to close",
replica->name());
return;
}
@@ -1655,7 +1688,7 @@ void replica_stub::init_gc_for_test()
&_tracker,
[this] { on_gc(); },
0,
-
std::chrono::milliseconds(_options.gc_interval_ms));
+
std::chrono::milliseconds(FLAGS_gc_interval_ms));
}
void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id)
@@ -1782,22 +1815,23 @@ void replica_stub::on_gc()
std::set<gpid> prevent_gc_replicas;
int reserved_log_count = _log->garbage_collection(
- gc_condition, _options.log_shared_file_count_limit,
prevent_gc_replicas);
- if (reserved_log_count > _options.log_shared_file_count_limit * 2) {
- LOG_INFO("gc_shared: trigger emergency checkpoint by
log_shared_file_count_limit, "
- "file_count_limit = {}, reserved_log_count = {}, trigger
all replicas to do "
- "checkpoint",
- _options.log_shared_file_count_limit,
- reserved_log_count);
+ gc_condition, FLAGS_log_shared_file_count_limit,
prevent_gc_replicas);
+ if (reserved_log_count > FLAGS_log_shared_file_count_limit * 2) {
+ LOG_INFO(
+ "gc_shared: trigger emergency checkpoint by
FLAGS_log_shared_file_count_limit, "
+ "file_count_limit = {}, reserved_log_count = {}, trigger all
replicas to do "
+ "checkpoint",
+ FLAGS_log_shared_file_count_limit,
+ reserved_log_count);
for (auto &kv : rs) {
tasking::enqueue(
LPC_PER_REPLICA_CHECKPOINT_TIMER,
kv.second.rep->tracker(),
std::bind(&replica_stub::trigger_checkpoint, this,
kv.second.rep, true),
kv.first.thread_hash(),
- std::chrono::milliseconds(rand::next_u32(0,
_options.gc_interval_ms / 2)));
+ std::chrono::milliseconds(rand::next_u32(0,
FLAGS_gc_interval_ms / 2)));
}
- } else if (reserved_log_count > _options.log_shared_file_count_limit) {
+ } else if (reserved_log_count > FLAGS_log_shared_file_count_limit) {
std::ostringstream oss;
int c = 0;
for (auto &i : prevent_gc_replicas) {
@@ -1806,13 +1840,14 @@ void replica_stub::on_gc()
oss << i.to_string();
c++;
}
- LOG_INFO("gc_shared: trigger emergency checkpoint by
log_shared_file_count_limit, "
- "file_count_limit = {}, reserved_log_count = {},
prevent_gc_replica_count = "
- "{}, trigger them to do checkpoint: {}",
- _options.log_shared_file_count_limit,
- reserved_log_count,
- prevent_gc_replicas.size(),
- oss.str());
+ LOG_INFO(
+ "gc_shared: trigger emergency checkpoint by
FLAGS_log_shared_file_count_limit, "
+ "file_count_limit = {}, reserved_log_count = {},
prevent_gc_replica_count = "
+ "{}, trigger them to do checkpoint: {}",
+ FLAGS_log_shared_file_count_limit,
+ reserved_log_count,
+ prevent_gc_replicas.size(),
+ oss.str());
for (auto &id : prevent_gc_replicas) {
auto find = rs.find(id);
if (find != rs.end()) {
@@ -1821,7 +1856,7 @@ void replica_stub::on_gc()
find->second.rep->tracker(),
std::bind(&replica_stub::trigger_checkpoint, this,
find->second.rep, true),
id.thread_hash(),
- std::chrono::milliseconds(rand::next_u32(0,
_options.gc_interval_ms / 2)));
+ std::chrono::milliseconds(rand::next_u32(0,
FLAGS_gc_interval_ms / 2)));
}
}
}
@@ -2136,7 +2171,7 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r)
int delay_ms = 0;
if (r->status() == partition_status::PS_INACTIVE) {
- delay_ms = _options.gc_memory_replica_interval_ms;
+ delay_ms = FLAGS_gc_memory_replica_interval_ms;
LOG_INFO("{}: delay {} milliseconds to close replica, status =
PS_INACTIVE",
r->name(),
delay_ms);
@@ -2412,7 +2447,7 @@ void replica_stub::register_ctrl_command()
if (args[0] == "DEFAULT") {
// set to default value
_mem_release_max_reserved_mem_percentage =
- _options.mem_release_max_reserved_mem_percentage;
+ FLAGS_mem_release_max_reserved_mem_percentage;
return result;
}
int32_t percentage = 0;
@@ -2435,6 +2470,7 @@ void replica_stub::register_ctrl_command()
#elif defined(DSN_USE_JEMALLOC)
register_jemalloc_ctrl_command();
#endif
+ // TODO(yingchun): use http
_cmds.emplace_back(::dsn::command_manager::instance().register_command(
{"replica.max-concurrent-bulk-load-downloading-count"},
"replica.max-concurrent-bulk-load-downloading-count [num |
DEFAULT]",
diff --git a/src/replica/split/replica_split_manager.cpp
b/src/replica/split/replica_split_manager.cpp
index e501f9041..6c88521a0 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -26,7 +26,9 @@
namespace dsn {
namespace replication {
+
DSN_DECLARE_bool(empty_write_disabled);
+DSN_DECLARE_int32(max_mutation_count_in_prepare_list);
replica_split_manager::replica_split_manager(replica *r)
: replica_base(r), _replica(r), _stub(r->get_replica_stub())
@@ -400,7 +402,7 @@
replica_split_manager::child_apply_private_logs(std::vector<std::string> plog_fi
// temp prepare_list used for apply states
prepare_list plist(_replica,
_replica->_app->last_committed_decree(),
- _replica->_options->max_mutation_count_in_prepare_list,
+ FLAGS_max_mutation_count_in_prepare_list,
[this](mutation_ptr &mu) {
if (mu->data.header.decree ==
_replica->_app->last_committed_decree() + 1) {
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index f6c4280d5..8bcaa9933 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -38,6 +38,8 @@
namespace dsn {
namespace replication {
+DSN_DECLARE_int32(log_private_file_size_mb);
+
class mock_replication_app_base : public replication_app_base
{
public:
@@ -139,7 +141,7 @@ public:
utils::filesystem::remove_path(log_dir);
_private_log =
- new mutation_log_private(log_dir,
_options->log_private_file_size_mb, get_gpid(), this);
+ new mutation_log_private(log_dir, FLAGS_log_private_file_size_mb,
get_gpid(), this);
error_code err =
_private_log->open(nullptr, [this](error_code err) {
CHECK_EQ_PREFIX(err, ERR_OK); });
diff --git a/src/server/pegasus_server_impl_init.cpp
b/src/server/pegasus_server_impl_init.cpp
index b8db579d1..012ca6ac1 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -39,6 +39,57 @@ DSN_DEFINE_int64(
rocksdb_limiter_max_write_megabytes_per_sec,
500,
"max rate of rocksdb flush and compaction(MB/s), if less than or equal to
0 means close limit");
+DSN_DEFINE_int32(pegasus.server,
+ rocksdb_max_background_flushes,
+ 4,
+ "rocksdb options.max_background_flushes, flush threads are
shared among all "
+ "rocksdb instances in one process");
+DSN_DEFINE_int32(pegasus.server,
+ rocksdb_max_background_compactions,
+ 12,
+ "rocksdb options.max_background_compactions, compaction
threads are shared among "
+ "all rocksdb instances in one process");
+DSN_DEFINE_int32(pegasus.server,
+ rocksdb_max_write_buffer_number,
+ 3,
+ "rocksdb options.max_write_buffer_number");
+DSN_DEFINE_int32(pegasus.server, rocksdb_num_levels, 6, "rocksdb
options.num_levels");
+DSN_DEFINE_int32(pegasus.server,
+ rocksdb_target_file_size_multiplier,
+ 1,
+ "rocksdb options.target_file_size_multiplier");
+DSN_DEFINE_int32(pegasus.server,
+ rocksdb_level0_file_num_compaction_trigger,
+ 4,
+ "rocksdb options.level0_file_num_compaction_trigger");
+DSN_DEFINE_int32(pegasus.server,
+ rocksdb_level0_slowdown_writes_trigger,
+ 30,
+ "rocksdb options.level0_slowdown_writes_trigger, default 30");
+DSN_DEFINE_int32(pegasus.server,
+ rocksdb_level0_stop_writes_trigger,
+ 60,
+ "rocksdb options.level0_stop_writes_trigger");
+DSN_DEFINE_int32(
+ pegasus.server,
+ rocksdb_block_cache_num_shard_bits,
+ -1,
+ "block cache will be sharded into 2^num_shard_bits shards, default value
is -1(auto)");
+
+// COMPATIBILITY ATTENTION:
+// Although old releases would see the new structure as corrupt filter data
and read the
+// table as if there's no filter, we've decided only to enable the new Bloom
filter with new
+// format_version=5. This provides a smooth path for automatic adoption over
time, with an
+// option for early opt-in.
+// Reference from rocksdb commit:
+//
https://github.com/facebook/rocksdb/commit/f059c7d9b96300091e07429a60f4ad55dac84859
+DSN_DEFINE_int32(pegasus.server,
+ rocksdb_format_version,
+ 2,
+ "block based table data format version, only 2 and 5 is
supported in Pegasus. 2 "
+ "is the old version, 5 is the new version supported since
rocksdb v6.6.4");
+DSN_DEFINE_validator(rocksdb_format_version,
+ [](int32_t value) -> bool { return value == 2 || value ==
5; });
DSN_DEFINE_bool(pegasus.server,
rocksdb_limiter_enable_auto_tune,
@@ -70,10 +121,8 @@ DSN_DEFINE_bool(pegasus.server,
DSN_DEFINE_int32(pegasus.server,
read_amp_bytes_per_bit,
0,
- "config for using to calculate the "
- "read amplification, must be a power "
- "of 2, zero means disable count read "
- "amplification");
+ "config for using to calculate the read amplification, must
be a power of 2, zero "
+ "means disable count read amplification");
DSN_DEFINE_validator(read_amp_bytes_per_bit, [](const int64_t
read_amp_bytes_per_bit) -> bool {
return read_amp_bytes_per_bit == 0 ||
@@ -252,19 +301,8 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_db_opts.listeners.emplace_back(new pegasus_event_listener(this));
- // flush threads are shared among all rocksdb instances in one process.
- _db_opts.max_background_flushes =
- (int)dsn_config_get_value_int64("pegasus.server",
- "rocksdb_max_background_flushes",
- 4,
- "rocksdb
options.max_background_flushes");
-
- // compaction threads are shared among all rocksdb instances in one
process.
- _db_opts.max_background_compactions =
- (int)dsn_config_get_value_int64("pegasus.server",
- "rocksdb_max_background_compactions",
- 12,
- "rocksdb
options.max_background_compactions");
+ _db_opts.max_background_flushes = FLAGS_rocksdb_max_background_flushes;
+ _db_opts.max_background_compactions =
FLAGS_rocksdb_max_background_compactions;
// init rocksdb::ColumnFamilyOptions for data column family
_data_cf_opts.write_buffer_size =
@@ -273,14 +311,8 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
64 * 1024 * 1024,
"rocksdb
options.write_buffer_size");
- _data_cf_opts.max_write_buffer_number =
- (int)dsn_config_get_value_int64("pegasus.server",
- "rocksdb_max_write_buffer_number",
- 3,
- "rocksdb
options.max_write_buffer_number");
-
- _data_cf_opts.num_levels = (int)dsn_config_get_value_int64(
- "pegasus.server", "rocksdb_num_levels", 6, "rocksdb
options.num_levels");
+ _data_cf_opts.max_write_buffer_number =
FLAGS_rocksdb_max_write_buffer_number;
+ _data_cf_opts.num_levels = FLAGS_rocksdb_num_levels;
_data_cf_opts.target_file_size_base =
dsn_config_get_value_uint64("pegasus.server",
@@ -288,11 +320,7 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
64 * 1024 * 1024,
"rocksdb options.target_file_size_base");
- _data_cf_opts.target_file_size_multiplier =
- (int)dsn_config_get_value_int64("pegasus.server",
- "rocksdb_target_file_size_multiplier",
- 1,
- "rocksdb
options.target_file_size_multiplier");
+ _data_cf_opts.target_file_size_multiplier =
FLAGS_rocksdb_target_file_size_multiplier;
_data_cf_opts.max_bytes_for_level_base =
dsn_config_get_value_uint64("pegasus.server",
@@ -308,24 +336,10 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
// we need set max_compaction_bytes definitely because
set_usage_scenario() depends on it.
_data_cf_opts.max_compaction_bytes = _data_cf_opts.target_file_size_base *
25;
-
_data_cf_opts.level0_file_num_compaction_trigger =
- (int)dsn_config_get_value_int64("pegasus.server",
-
"rocksdb_level0_file_num_compaction_trigger",
- 4,
- "rocksdb
options.level0_file_num_compaction_trigger");
-
- _data_cf_opts.level0_slowdown_writes_trigger =
(int)dsn_config_get_value_int64(
- "pegasus.server",
- "rocksdb_level0_slowdown_writes_trigger",
- 30,
- "rocksdb options.level0_slowdown_writes_trigger, default 30");
-
- _data_cf_opts.level0_stop_writes_trigger =
- (int)dsn_config_get_value_int64("pegasus.server",
- "rocksdb_level0_stop_writes_trigger",
- 60,
- "rocksdb
options.level0_stop_writes_trigger");
+ FLAGS_rocksdb_level0_file_num_compaction_trigger;
+ _data_cf_opts.level0_slowdown_writes_trigger =
FLAGS_rocksdb_level0_slowdown_writes_trigger;
+ _data_cf_opts.level0_stop_writes_trigger =
FLAGS_rocksdb_level0_stop_writes_trigger;
std::string compression_str = dsn_config_get_value_string(
"pegasus.server",
@@ -366,15 +380,9 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
10 * 1024 * 1024 * 1024ULL,
"block cache capacity for one pegasus server, shared by all
rocksdb instances");
- // block cache num shard bits, default -1(auto)
- int num_shard_bits = (int)dsn_config_get_value_int64(
- "pegasus.server",
- "rocksdb_block_cache_num_shard_bits",
- -1,
- "block cache will be sharded into 2^num_shard_bits shards");
-
// init block cache
- _s_block_cache = rocksdb::NewLRUCache(capacity, num_shard_bits);
+ _s_block_cache =
+ rocksdb::NewLRUCache(capacity,
FLAGS_rocksdb_block_cache_num_shard_bits);
});
// every replica has the same block cache
@@ -544,25 +552,7 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
"rocksdb_bloom_filter_bits_per_key",
10,
"average bits allocated per key in
bloom filter");
- // COMPATIBILITY ATTENTION:
- // Although old releases would see the new structure as corrupt filter
data and read the
- // table as if there's no filter, we've decided only to enable the new
Bloom filter with new
- // format_version=5. This provides a smooth path for automatic
adoption over time, with an
- // option for early opt-in.
- // Reference from rocksdb commit:
- //
https://github.com/facebook/rocksdb/commit/f059c7d9b96300091e07429a60f4ad55dac84859
- int format_version =
- (int)dsn_config_get_value_int64("pegasus.server",
- "rocksdb_format_version",
- 2,
- "block based table data format
version, "
- "only 2 and 5 is supported in
Pegasus. "
- "2 is the old version, 5 is the
new "
- "version supported since rocksdb "
- "v6.6.4");
- CHECK(format_version == 2 || format_version == 5,
- "[pegasus.server]rocksdb_format_version should be either '2' or
'5'.");
- tbl_opts.format_version = format_version;
+ tbl_opts.format_version = FLAGS_rocksdb_format_version;
tbl_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bits_per_key,
false));
std::string filter_type =
diff --git a/src/shell/commands/data_operations.cpp
b/src/shell/commands/data_operations.cpp
index dee21bd3b..1dc54247a 100644
--- a/src/shell/commands/data_operations.cpp
+++ b/src/shell/commands/data_operations.cpp
@@ -20,6 +20,12 @@
#include "shell/commands.h"
#include <fmt/printf.h>
#include "idl_utils.h"
+#include "utils/flags.h"
+
+DSN_DEFINE_int32(threadpool.THREAD_POOL_DEFAULT,
+ worker_count,
+ 0,
+ "get THREAD_POOL_DEFAULT worker_count.");
static void
print_current_scan_state(const std::vector<std::unique_ptr<scan_data_context>>
&contexts,
@@ -1818,13 +1824,10 @@ bool copy_data(command_executor *e, shell_context *sc,
arguments args)
"WARN: used multi_set will lose accurate ttl time per value! "
"ttl time will be assign the max value of this batch data.\n");
op = SCAN_AND_MULTI_SET;
+
+ fprintf(stderr, "INFO: THREAD_POOL_DEFAULT worker_count = %d\n",
FLAGS_worker_count);
// threadpool worker_count should greater than source app scanner count
- int worker_count =
dsn_config_get_value_int64("threadpool.THREAD_POOL_DEFAULT",
- "worker_count",
- 0,
- "get THREAD_POOL_DEFAULT
worker_count.");
- fprintf(stderr, "INFO: THREAD_POOL_DEFAULT worker_count = %d\n",
worker_count);
- if (worker_count <= split_count) {
+ if (FLAGS_worker_count <= split_count) {
fprintf(stderr,
"INFO: THREAD_POOL_DEFAULT worker_count should greater
than source app scanner "
"count %d",
diff --git a/src/test/bench_test/benchmark.cpp
b/src/test/bench_test/benchmark.cpp
index 0d45bf232..6b7372b6d 100644
--- a/src/test/bench_test/benchmark.cpp
+++ b/src/test/bench_test/benchmark.cpp
@@ -24,11 +24,19 @@
#include "rand.h"
#include "runtime/app_model.h"
#include "utils/api_utilities.h"
+#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
namespace pegasus {
namespace test {
+
+DSN_DECLARE_int32(hashkey_size);
+DSN_DECLARE_int32(pegasus_timeout_ms);
+DSN_DECLARE_int32(sortkey_size);
+DSN_DECLARE_int32(threads);
+DSN_DECLARE_int32(value_size);
+
benchmark::benchmark()
{
_client =
pegasus_client_factory::get_client(config::instance().pegasus_cluster_name.c_str(),
@@ -52,7 +60,7 @@ void benchmark::run()
while (std::getline(benchmark_stream, name, ',')) {
// run the specified benchmark
operation_type op_type = get_operation_type(name);
- run_benchmark(config::instance().threads, op_type);
+ run_benchmark(FLAGS_threads, op_type);
}
}
@@ -111,10 +119,9 @@ void benchmark::write_random(thread_arg *thread)
int try_count = 0;
while (true) {
try_count++;
- int ret = _client->set(hashkey, sortkey, value,
config::instance().pegasus_timeout_ms);
+ int ret = _client->set(hashkey, sortkey, value,
FLAGS_pegasus_timeout_ms);
if (ret == ::pegasus::PERR_OK) {
- bytes += config::instance().value_size +
config::instance().hashkey_size +
- config::instance().sortkey_size;
+ bytes += FLAGS_value_size + FLAGS_hashkey_size +
FLAGS_sortkey_size;
count++;
break;
} else if (ret != ::pegasus::PERR_TIMEOUT || try_count > 3) {
@@ -147,7 +154,7 @@ void benchmark::read_random(thread_arg *thread)
int try_count = 0;
while (true) {
try_count++;
- int ret = _client->get(hashkey, sortkey, value,
config::instance().pegasus_timeout_ms);
+ int ret = _client->get(hashkey, sortkey, value,
FLAGS_pegasus_timeout_ms);
if (ret == ::pegasus::PERR_OK) {
found++;
bytes += hashkey.size() + sortkey.size() + value.size();
@@ -184,7 +191,7 @@ void benchmark::delete_random(thread_arg *thread)
int try_count = 0;
while (true) {
try_count++;
- int ret = _client->del(hashkey, sortkey,
config::instance().pegasus_timeout_ms);
+ int ret = _client->del(hashkey, sortkey, FLAGS_pegasus_timeout_ms);
if (ret == ::pegasus::PERR_OK) {
break;
} else if (ret != ::pegasus::PERR_TIMEOUT || try_count > 3) {
@@ -202,9 +209,9 @@ void benchmark::delete_random(thread_arg *thread)
void benchmark::generate_kv_pair(std::string &hashkey, std::string &sortkey,
std::string &value)
{
- hashkey = generate_string(config::instance().hashkey_size);
- sortkey = generate_string(config::instance().sortkey_size);
- value = generate_string(config::instance().value_size);
+ hashkey = generate_string(FLAGS_hashkey_size);
+ sortkey = generate_string(FLAGS_sortkey_size);
+ value = generate_string(FLAGS_value_size);
}
operation_type benchmark::get_operation_type(const std::string &name)
@@ -227,14 +234,13 @@ operation_type benchmark::get_operation_type(const
std::string &name)
void benchmark::print_header()
{
const config &config_ = config::instance();
- fmt::print(stdout, "Hashkeys: {} bytes each\n",
config_.hashkey_size);
- fmt::print(stdout, "Sortkeys: {} bytes each\n",
config_.sortkey_size);
- fmt::print(stdout, "Values: {} bytes each\n", config_.value_size);
+ fmt::print(stdout, "Hashkeys: {} bytes each\n", FLAGS_hashkey_size);
+ fmt::print(stdout, "Sortkeys: {} bytes each\n", FLAGS_sortkey_size);
+ fmt::print(stdout, "Values: {} bytes each\n", FLAGS_value_size);
fmt::print(stdout, "Entries: {}\n", config_.num);
fmt::print(stdout,
"FileSize: {} MB (estimated)\n",
- ((config_.hashkey_size + config_.sortkey_size +
config_.value_size) * config_.num) >>
- 20);
+ ((FLAGS_hashkey_size + FLAGS_sortkey_size + FLAGS_value_size) *
config_.num) >> 20);
print_warnings();
fmt::print(stdout, "------------------------------------------------\n");
diff --git a/src/test/bench_test/config.cpp b/src/test/bench_test/config.cpp
index bdf0fd6b1..16349f6d7 100644
--- a/src/test/bench_test/config.cpp
+++ b/src/test/bench_test/config.cpp
@@ -17,23 +17,29 @@
* under the License.
*/
-#include "utils/config_api.h"
#include "config.h"
+#include "utils/config_api.h"
+#include "utils/flags.h"
+
namespace pegasus {
namespace test {
+DSN_DEFINE_int32(pegasus.benchmark,
+ pegasus_timeout_ms,
+ 1000,
+ "pegasus read/write timeout in milliseconds");
+DSN_DEFINE_int32(pegasus.benchmark, threads, 1, "Number of concurrent threads
to run");
+DSN_DEFINE_int32(pegasus.benchmark, hashkey_size, 16, "size of each hashkey");
+DSN_DEFINE_int32(pegasus.benchmark, sortkey_size, 16, "size of each sortkey");
+DSN_DEFINE_int32(pegasus.benchmark, value_size, 100, "Size of each value");
+
config::config()
{
pegasus_cluster_name = dsn_config_get_value_string(
"pegasus.benchmark", "pegasus_cluster_name", "onebox", "pegasus
cluster name");
pegasus_app_name = dsn_config_get_value_string(
"pegasus.benchmark", "pegasus_app_name", "temp", "pegasus app name");
- pegasus_timeout_ms =
- (int32_t)dsn_config_get_value_uint64("pegasus.benchmark",
- "pegasus_timeout_ms",
- 1000,
- "pegasus read/write timeout in
milliseconds");
benchmarks = dsn_config_get_value_string(
"pegasus.benchmark",
"benchmarks",
@@ -44,14 +50,6 @@ config::config()
"\tdeleterandom_pegasus -- pegasus delete N keys in random
order\n");
num = dsn_config_get_value_uint64(
"pegasus.benchmark", "num", 10000, "Number of key/values to place in
database");
- threads = (int32_t)dsn_config_get_value_uint64(
- "pegasus.benchmark", "threads", 1, "Number of concurrent threads to
run");
- hashkey_size = (int32_t)dsn_config_get_value_uint64(
- "pegasus.benchmark", "hashkey_size", 16, "size of each hashkey");
- sortkey_size = (int32_t)dsn_config_get_value_uint64(
- "pegasus.benchmark", "sortkey_size", 16, "size of each sortkey");
- value_size = (int32_t)dsn_config_get_value_uint64(
- "pegasus.benchmark", "value_size", 100, "Size of each value");
seed = dsn_config_get_value_uint64(
"pegasus.benchmark",
"seed",
diff --git a/src/test/bench_test/config.h b/src/test/bench_test/config.h
index 749d9a899..f6a5f5f1f 100644
--- a/src/test/bench_test/config.h
+++ b/src/test/bench_test/config.h
@@ -30,20 +30,10 @@ struct config : public dsn::utils::singleton<config>
{
std::string pegasus_cluster_name;
std::string pegasus_app_name;
- // Pegasus read/write/delete timeout in milliseconds
- uint32_t pegasus_timeout_ms;
// Comma-separated list of operations to run
std::string benchmarks;
// Number of key/values to place in database
uint64_t num;
- // Number of concurrent threads to run
- uint32_t threads;
- // size of each value
- uint32_t value_size;
- // size of each hashkey
- uint32_t hashkey_size;
- // size of each sortkey
- uint32_t sortkey_size;
// Seed base for random number generators
uint64_t seed;
// Default environment suitable for the current operating system
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]