This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new ec905d991 refactor(conf): use DSN_DEFINE_int64 to load int64 type of
configs (#1357)
ec905d991 is described below
commit ec905d9912d740eb7134384cab84aa970349e1d6
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Feb 22 11:31:52 2023 +0800
refactor(conf): use DSN_DEFINE_int64 to load int64 type of configs (#1357)
---
src/common/replication_common.cpp | 6 ------
src/common/replication_common.h | 1 -
src/server/config.ini | 1 -
src/server/pegasus_server_impl.cpp | 6 +++++-
src/server/pegasus_server_impl.h | 1 -
src/server/pegasus_server_impl_init.cpp | 16 ++++++----------
src/server/pegasus_write_service.cpp | 14 +++++++-------
src/server/pegasus_write_service.h | 1 -
src/server/test/config.ini | 1 -
src/test/pressure_test/main.cpp | 22 ++++++++++------------
10 files changed, 28 insertions(+), 41 deletions(-)
diff --git a/src/common/replication_common.cpp
b/src/common/replication_common.cpp
index f2ff1a410..08d2d5dfa 100644
--- a/src/common/replication_common.cpp
+++ b/src/common/replication_common.cpp
@@ -100,7 +100,6 @@ replication_options::replication_options()
group_check_disabled = false;
checkpoint_disabled = false;
- checkpoint_min_decree_gap = 10000;
gc_disabled = false;
@@ -209,11 +208,6 @@ void replication_options::initialize()
"checkpoint_disabled",
checkpoint_disabled,
"whether checkpoint is
disabled");
- checkpoint_min_decree_gap =
- (int64_t)dsn_config_get_value_uint64("replication",
- "checkpoint_min_decree_gap",
- checkpoint_min_decree_gap,
- "minimum decree gap that triggers
checkpoint");
gc_disabled = dsn_config_get_value_bool(
"replication", "gc_disabled", gc_disabled, "whether to disable garbage
collection");
diff --git a/src/common/replication_common.h b/src/common/replication_common.h
index 970ad3a45..e9bf90545 100644
--- a/src/common/replication_common.h
+++ b/src/common/replication_common.h
@@ -76,7 +76,6 @@ public:
bool group_check_disabled;
bool checkpoint_disabled;
- int64_t checkpoint_min_decree_gap;
bool gc_disabled;
diff --git a/src/server/config.ini b/src/server/config.ini
index 86d37ee8c..18c776e23 100644
--- a/src/server/config.ini
+++ b/src/server/config.ini
@@ -255,7 +255,6 @@ stateful = true
checkpoint_disabled = false
checkpoint_interval_seconds = 300
- checkpoint_min_decree_gap = 10000
checkpoint_max_interval_hours = 2
gc_disabled = false
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index 4629304b9..1013e4d71 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -58,6 +58,10 @@ DSN_DEFINE_int32(pegasus.server,
hotkey_analyse_time_interval_s,
10,
"hotkey analyse interval in seconds");
+DSN_DEFINE_int32(pegasus.server,
+ update_rdb_stat_interval,
+ 60,
+ "The interval seconds to update RocksDB statistics, in
seconds.");
static std::string chkpt_get_dir_name(int64_t decree)
{
@@ -1695,7 +1699,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char
**argv)
dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON,
&_tracker,
[this]() {
this->update_replica_rocksdb_statistics(); },
- _update_rdb_stat_interval);
+
std::chrono::seconds(FLAGS_update_rdb_stat_interval));
// These counters are singletons on this server shared by all replicas,
their metrics update
// task should be scheduled once an interval on the server view.
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 84687353c..8e651b9a3 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -464,7 +464,6 @@ private:
pegasus_context_cache _context_cache;
- std::chrono::seconds _update_rdb_stat_interval;
::dsn::task_ptr _update_replica_rdb_stat;
static ::dsn::task_ptr _update_server_rdb_stat;
diff --git a/src/server/pegasus_server_impl_init.cpp
b/src/server/pegasus_server_impl_init.cpp
index 571f7e09b..edbb49c7c 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -194,6 +194,11 @@ DSN_DEFINE_uint32(pegasus.server,
checkpoint_reserve_time_seconds,
1800,
"Minimum seconds of checkpoint to reserve, 0 means no
check.");
+DSN_DEFINE_int32(pegasus.server,
+ rocksdb_max_open_files,
+ -1,
+ "The number of opened files that can be used by a
replica(namely a DB instance). "
+ "The default value is -1 which means always keep files
opened.");
static const std::unordered_map<std::string,
rocksdb::BlockBasedTableOptions::IndexType>
INDEX_TYPE_STRING_MAP = {
@@ -445,12 +450,7 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_db_opts.write_buffer_manager = _s_write_buffer_manager;
}
- int64_t max_open_files = dsn_config_get_value_int64(
- "pegasus.server",
- "rocksdb_max_open_files",
- -1, /* always keep files opened, default by rocksdb */
- "number of opened files that can be used by a replica(namely a DB
instance)");
- _db_opts.max_open_files = static_cast<int>(max_open_files);
+ _db_opts.max_open_files = FLAGS_rocksdb_max_open_files;
LOG_INFO_PREFIX("rocksdb_max_open_files = {}", _db_opts.max_open_files);
_db_opts.max_log_file_size =
static_cast<size_t>(FLAGS_rocksdb_max_log_file_size);
@@ -593,10 +593,6 @@
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_checkpoint_reserve_min_count = FLAGS_checkpoint_reserve_min_count;
_checkpoint_reserve_time_seconds = FLAGS_checkpoint_reserve_time_seconds;
- // TODO(yingchun): signed integral type of at least 35 bits, int64_t
- _update_rdb_stat_interval =
std::chrono::seconds(dsn_config_get_value_uint64(
- "pegasus.server", "update_rdb_stat_interval", 60,
"update_rdb_stat_interval, in seconds"));
-
// TODO: move the qps/latency counters and it's statistics to
replication_app_base layer
std::string str_gpid = _gpid.to_string();
char name[256];
diff --git a/src/server/pegasus_write_service.cpp
b/src/server/pegasus_write_service.cpp
index a923a3df4..338e81190 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -29,6 +29,12 @@
namespace pegasus {
namespace server {
+DSN_DEFINE_int64(pegasus.server,
+ dup_lagging_write_threshold_ms,
+ 10 * 1000,
+ "If the duration that a write flows from master to slave is
larger than this "
+ "threshold, the write is defined a lagging write.");
+
DEFINE_TASK_CODE(LPC_INGESTION, TASK_PRIORITY_COMMON, THREAD_POOL_INGESTION)
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
@@ -130,12 +136,6 @@
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
COUNTER_TYPE_NUMBER_PERCENTILES,
"the time (in ms) lag between master and slave in the duplication");
- _dup_lagging_write_threshold_ms = dsn_config_get_value_int64(
- "pegasus.server",
- "dup_lagging_write_threshold_ms",
- 10 * 1000,
- "If the duration that a write flows from master to slave is larger
than this threshold, "
- "the write is defined a lagging write.");
_pfc_dup_lagging_writes.init_app_counter(
"app.pegasus",
fmt::format("dup.lagging_writes@{}", app_name()).c_str(),
@@ -327,7 +327,7 @@ int pegasus_write_service::duplicate(int64_t decree,
_pfc_duplicate_qps->increment();
auto cleanup = dsn::defer([this, &request]() {
uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000;
- if (latency_ms > _dup_lagging_write_threshold_ms) {
+ if (latency_ms > FLAGS_dup_lagging_write_threshold_ms) {
_pfc_dup_lagging_writes->increment();
}
_pfc_dup_time_lag->set(latency_ms);
diff --git a/src/server/pegasus_write_service.h
b/src/server/pegasus_write_service.h
index a93fe779b..2315189f5 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -197,7 +197,6 @@ private:
uint64_t _batch_start_time;
capacity_unit_calculator *_cu_calculator;
- int64_t _dup_lagging_write_threshold_ms;
::dsn::perf_counter_wrapper _pfc_put_qps;
::dsn::perf_counter_wrapper _pfc_multi_put_qps;
diff --git a/src/server/test/config.ini b/src/server/test/config.ini
index bc742fc0d..a1125b265 100644
--- a/src/server/test/config.ini
+++ b/src/server/test/config.ini
@@ -165,7 +165,6 @@ group_check_interval_ms = 100000
checkpoint_disabled = false
checkpoint_interval_seconds = 100
-checkpoint_min_decree_gap = 10000
checkpoint_max_interval_hours = 1
gc_disabled = false
diff --git a/src/test/pressure_test/main.cpp b/src/test/pressure_test/main.cpp
index 2f8b926bd..8a03ed61d 100644
--- a/src/test/pressure_test/main.cpp
+++ b/src/test/pressure_test/main.cpp
@@ -40,10 +40,14 @@ DSN_DEFINE_int32(pressureclient, hashkey_len, 64, "hashkey
length");
DSN_DEFINE_int32(pressureclient, sortkey_len, 64, "sortkey length");
DSN_DEFINE_int32(pressureclient, value_len, 64, "value length");
DSN_DEFINE_validator(qps, [](int32_t value) -> bool { return value > 0; });
-
-// generate hashkey/sortkey between [0, ****key_limit]
-static int64_t hashkey_limit;
-static int64_t sortkey_limit;
+DSN_DEFINE_int64(pressureclient,
+ hashkey_limit,
+ 0,
+ "The hashkey range to generate, in format [0,
****key_limit].");
+DSN_DEFINE_int64(pressureclient,
+ sortkey_limit,
+ 0,
+ "The sortkey range to generate, in format [0,
****key_limit].");
// for app
static pegasus_client *pg_client = nullptr;
@@ -59,7 +63,7 @@ std::string fill_string(const std::string &str, int len)
std::string get_hashkey()
{
- std::string key = to_string(dsn::rand::next_u64(0, hashkey_limit));
+ std::string key = to_string(dsn::rand::next_u64(0, FLAGS_hashkey_limit));
if (key.size() >= FLAGS_hashkey_len) {
return key;
} else {
@@ -69,7 +73,7 @@ std::string get_hashkey()
std::string get_sortkey()
{
- std::string key = to_string(dsn::rand::next_u64(0, sortkey_limit));
+ std::string key = to_string(dsn::rand::next_u64(0, FLAGS_sortkey_limit));
if (key.size() >= FLAGS_sortkey_len) {
return key;
} else {
@@ -230,12 +234,6 @@ int main(int argc, const char **argv)
op_name = dsn_config_get_value_string("pressureclient", "operation_name",
"", "operation name");
- hashkey_limit =
- (int64_t)dsn_config_get_value_uint64("pressureclient",
"hashkey_limit", 0, "hashkey limit");
-
- sortkey_limit =
- (int64_t)dsn_config_get_value_uint64("pressureclient",
"sortkey_limit", 0, "sortkey limit");
-
CHECK(!op_name.empty(), "must assign operation name");
LOG_INFO("pressureclient {} qps = {}", op_name, FLAGS_qps);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]