This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 93ce7925 Trying to add the unit of time in variable name (#2276)
93ce7925 is described below
commit 93ce7925b26d619ad81c077191ddd69a7fb55d42
Author: mwish <[email protected]>
AuthorDate: Tue May 7 14:01:25 2024 +0800
Trying to add the unit of time in variable name (#2276)
Co-authored-by: hulk <[email protected]>
---
src/cluster/replication.cc | 2 +-
src/cluster/replication.h | 4 +--
src/commands/cmd_replication.cc | 8 ++---
src/commands/cmd_stream.cc | 6 ++--
src/common/cron.cc | 2 +-
src/common/cron.h | 2 +-
src/common/time_util.h | 1 +
src/server/server.cc | 73 +++++++++++++++++++++--------------------
src/server/server.h | 13 ++++----
src/stats/stats.cc | 8 ++---
src/stats/stats.h | 4 +--
src/storage/rdb.cc | 21 ++++++------
src/storage/storage.cc | 33 ++++++++++---------
src/storage/storage.h | 15 +++++----
src/types/redis_stream.cc | 24 +++++++-------
src/types/redis_stream_base.h | 6 ++--
src/types/redis_string.cc | 20 +++++------
src/types/redis_string.h | 15 +++++----
18 files changed, 132 insertions(+), 125 deletions(-)
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 4df05a47..57d8b9bc 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -201,7 +201,7 @@ LOOP_LABEL:
assert(handler_idx_ <= handlers_.size());
DLOG(INFO) << "[replication] Execute handler[" <<
getHandlerName(handler_idx_) << "]";
auto st = getHandlerFunc(handler_idx_)(repl_, bev);
- repl_->last_io_time_.store(util::GetTimeStamp(), std::memory_order_relaxed);
+ repl_->last_io_time_secs_.store(util::GetTimeStamp(),
std::memory_order_relaxed);
switch (st) {
case CBState::NEXT:
++handler_idx_;
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index b7f49717..b223bd6a 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -98,7 +98,7 @@ class ReplicationThread : private
EventCallbackBase<ReplicationThread> {
Status Start(std::function<void()> &&pre_fullsync_cb, std::function<void()>
&&post_fullsync_cb);
void Stop();
ReplState State() { return repl_state_.load(std::memory_order_relaxed); }
- time_t LastIOTime() { return last_io_time_.load(std::memory_order_relaxed); }
+ int64_t LastIOTimeSecs() const { return
last_io_time_secs_.load(std::memory_order_relaxed); }
void TimerCB(int, int16_t);
@@ -155,7 +155,7 @@ class ReplicationThread : private
EventCallbackBase<ReplicationThread> {
Server *srv_ = nullptr;
engine::Storage *storage_ = nullptr;
std::atomic<ReplState> repl_state_;
- std::atomic<time_t> last_io_time_ = 0;
+ std::atomic<int64_t> last_io_time_secs_ = 0;
bool next_try_old_psync_ = false;
bool next_try_without_announce_ip_address_ = false;
diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc
index 0a86a9cc..6beffea8 100644
--- a/src/commands/cmd_replication.cc
+++ b/src/commands/cmd_replication.cc
@@ -242,8 +242,8 @@ class CommandFetchMeta : public Commander {
} else {
LOG(WARNING) << "[replication] Fail to send full data file info " <<
ip << ", error: " << strerror(errno);
}
- auto now = static_cast<time_t>(util::GetTimeStamp());
- srv->storage->SetCheckpointAccessTime(now);
+ auto now_secs = static_cast<time_t>(util::GetTimeStamp());
+ srv->storage->SetCheckpointAccessTimeSecs(now_secs);
}));
if (auto s = util::ThreadDetach(t); !s) {
@@ -311,8 +311,8 @@ class CommandFetchFile : public Commander {
usleep(shortest - duration);
}
}
- auto now = static_cast<time_t>(util::GetTimeStamp());
- srv->storage->SetCheckpointAccessTime(now);
+ auto now_secs = util::GetTimeStamp<std::chrono::seconds>();
+ srv->storage->SetCheckpointAccessTimeSecs(now_secs);
srv->DecrFetchFileThread();
}));
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index f8736168..a5fdf795 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -588,7 +588,7 @@ class CommandXInfo : public Commander {
}
output->append(redis::MultiLen(result_vector.size()));
- auto now = util::GetTimeStampMS();
+ auto now_ms = util::GetTimeStampMS();
for (auto const &it : result_vector) {
output->append(conn->HeaderOfMap(4));
output->append(redis::BulkString("name"));
@@ -596,9 +596,9 @@ class CommandXInfo : public Commander {
output->append(redis::BulkString("pending"));
output->append(redis::Integer(it.second.pending_number));
output->append(redis::BulkString("idle"));
- output->append(redis::Integer(now - it.second.last_idle));
+ output->append(redis::Integer(now_ms - it.second.last_idle_ms));
output->append(redis::BulkString("inactive"));
- output->append(redis::Integer(now - it.second.last_active));
+ output->append(redis::Integer(now_ms - it.second.last_active_ms));
}
return Status::OK();
diff --git a/src/common/cron.cc b/src/common/cron.cc
index f4c223bf..2c4a03ba 100644
--- a/src/common/cron.cc
+++ b/src/common/cron.cc
@@ -52,7 +52,7 @@ Status Cron::SetScheduleTime(const std::vector<std::string>
&args) {
return Status::OK();
}
-bool Cron::IsTimeMatch(tm *tm) {
+bool Cron::IsTimeMatch(const tm *tm) {
if (tm->tm_min == last_tm_.tm_min && tm->tm_hour == last_tm_.tm_hour &&
tm->tm_mday == last_tm_.tm_mday &&
tm->tm_mon == last_tm_.tm_mon && tm->tm_wday == last_tm_.tm_wday) {
return false;
diff --git a/src/common/cron.h b/src/common/cron.h
index cba6d275..5385a0ef 100644
--- a/src/common/cron.h
+++ b/src/common/cron.h
@@ -43,7 +43,7 @@ class Cron {
~Cron() = default;
Status SetScheduleTime(const std::vector<std::string> &args);
- bool IsTimeMatch(tm *tm);
+ bool IsTimeMatch(const tm *tm);
std::string ToString() const;
bool IsEnabled() const;
diff --git a/src/common/time_util.h b/src/common/time_util.h
index 1c8dc7b6..9eb6daa4 100644
--- a/src/common/time_util.h
+++ b/src/common/time_util.h
@@ -24,6 +24,7 @@
namespace util {
+/// Get the system timestamp in seconds, milliseconds or microseconds.
template <typename Duration = std::chrono::seconds>
auto GetTimeStamp() {
return
std::chrono::duration_cast<Duration>(std::chrono::system_clock::now().time_since_epoch()).count();
diff --git a/src/server/server.cc b/src/server/server.cc
index e482aefb..7c50a008 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -52,7 +52,7 @@
#include "worker.h"
Server::Server(engine::Storage *storage, Config *config)
- : storage(storage), start_time_(util::GetTimeStamp()), config_(config),
namespace_(storage) {
+ : storage(storage), start_time_secs_(util::GetTimeStamp()),
config_(config), namespace_(storage) {
// init commands stats here to prevent concurrent insert, and cause core
auto commands = redis::CommandTable::GetOriginal();
for (const auto &iter : *commands) {
@@ -179,7 +179,7 @@ Status Server::Start() {
compaction_checker_thread_ = GET_OR_RET(util::CreateThread("compact-check",
[this] {
uint64_t counter = 0;
- time_t last_compact_date = 0;
+ int64_t last_compact_date = 0;
CompactionChecker compaction_checker{this->storage};
while (!stop_) {
@@ -192,11 +192,9 @@ Status Server::Start() {
if (!is_loading_ && ++counter % 600 == 0 // check every minute
&& config_->compaction_checker_range.Enabled()) {
- auto now = static_cast<time_t>(util::GetTimeStamp());
- std::tm local_time{};
- localtime_r(&now, &local_time);
- if (local_time.tm_hour >= config_->compaction_checker_range.start &&
- local_time.tm_hour <= config_->compaction_checker_range.stop) {
+ auto now_hours = util::GetTimeStamp<std::chrono::hours>();
+ if (now_hours >= config_->compaction_checker_range.start &&
+ now_hours <= config_->compaction_checker_range.stop) {
std::vector<std::string> cf_names =
{engine::kMetadataColumnFamilyName, engine::kSubkeyColumnFamilyName,
engine::kZSetScoreColumnFamilyName, engine::kStreamColumnFamilyName};
for (const auto &cf_name : cf_names) {
@@ -204,8 +202,8 @@ Status Server::Start() {
}
}
// compact once per day
- if (now != 0 && last_compact_date != now / 86400) {
- last_compact_date = now / 86400;
+ if (now_hours != 0 && last_compact_date != now_hours / 24) {
+ last_compact_date = now_hours / 24;
compaction_checker.CompactPropagateAndPubSubFiles();
}
}
@@ -344,9 +342,9 @@ void Server::CleanupExitedSlaves() {
void Server::FeedMonitorConns(redis::Connection *conn, const
std::vector<std::string> &tokens) {
if (monitor_clients_ <= 0) return;
- auto now = util::GetTimeStampUS();
+ auto now_us = util::GetTimeStampUS();
std::string output =
- fmt::format("{}.{} [{} {}]", now / 1000000, now % 1000000,
conn->GetNamespace(), conn->GetAddr());
+ fmt::format("{}.{} [{} {}]", now_us / 1000000, now_us % 1000000,
conn->GetNamespace(), conn->GetAddr());
for (const auto &token : tokens) {
output += " \"";
output += util::EscapeString(token);
@@ -674,7 +672,7 @@ void Server::OnEntryAddedToStream(const std::string &ns,
const std::string &key,
}
}
-void Server::updateCachedTime() { unix_time.store(util::GetTimeStamp()); }
+void Server::updateCachedTime() { unix_time_secs.store(util::GetTimeStamp()); }
int Server::IncrClientNum() {
total_clients_.fetch_add(1, std::memory_order_relaxed);
@@ -787,13 +785,14 @@ void Server::cron() {
// No replica uses this checkpoint, we can remove it.
if (counter != 0 && counter % 100 == 0) {
- time_t create_time = storage->GetCheckpointCreateTime();
- time_t access_time = storage->GetCheckpointAccessTime();
+ int64_t create_time_secs = storage->GetCheckpointCreateTimeSecs();
+ int64_t access_time_secs = storage->GetCheckpointAccessTimeSecs();
if (storage->ExistCheckpoint()) {
// TODO(shooterit): support to config the alive time of checkpoint
- auto now = static_cast<time_t>(util::GetTimeStamp());
- if ((GetFetchFileThreadNum() == 0 && now - access_time > 30) || (now -
create_time > 24 * 60 * 60)) {
+ int64_t now_secs = util::GetTimeStamp<std::chrono::seconds>();
+ if ((GetFetchFileThreadNum() == 0 && now_secs - access_time_secs > 30)
||
+ (now_secs - create_time_secs > 24 * 60 * 60)) {
auto s = rocksdb::DestroyDB(config_->checkpoint_dir,
rocksdb::Options());
if (!s.ok()) {
LOG(WARNING) << "[server] Fail to clean checkpoint, error: " <<
s.ToString();
@@ -963,9 +962,9 @@ void Server::GetServerInfo(std::string *info) {
string_stream << "arch_bits:" << sizeof(void *) * 8 << "\r\n";
string_stream << "process_id:" << getpid() << "\r\n";
string_stream << "tcp_port:" << config_->port << "\r\n";
- int64_t now = util::GetTimeStamp();
- string_stream << "uptime_in_seconds:" << now - start_time_ << "\r\n";
- string_stream << "uptime_in_days:" << (now - start_time_) / 86400 << "\r\n";
+ int64_t now_secs = util::GetTimeStamp<std::chrono::seconds>();
+ string_stream << "uptime_in_seconds:" << now_secs - start_time_secs_ <<
"\r\n";
+ string_stream << "uptime_in_days:" << (now_secs - start_time_secs_) / 86400
<< "\r\n";
*info = string_stream.str();
}
@@ -1000,14 +999,14 @@ void Server::GetReplicationInfo(std::string *info) {
string_stream << "# Replication\r\n";
string_stream << "role:" << (IsSlave() ? "slave" : "master") << "\r\n";
if (IsSlave()) {
- time_t now = util::GetTimeStamp();
+ int64_t now_secs = util::GetTimeStamp<std::chrono::seconds>();
string_stream << "master_host:" << master_host_ << "\r\n";
string_stream << "master_port:" << master_port_ << "\r\n";
ReplState state = GetReplicationState();
string_stream << "master_link_status:" << (state == kReplConnected ? "up"
: "down") << "\r\n";
string_stream << "master_sync_unrecoverable_error:" << (state ==
kReplError ? "yes" : "no") << "\r\n";
string_stream << "master_sync_in_progress:" << (state == kReplFetchMeta ||
state == kReplFetchSST) << "\r\n";
- string_stream << "master_last_io_seconds_ago:" << now -
replication_thread_->LastIOTime() << "\r\n";
+ string_stream << "master_last_io_seconds_ago:" << now_secs -
replication_thread_->LastIOTimeSecs() << "\r\n";
string_stream << "slave_repl_offset:" << storage->LatestSeqNumber() <<
"\r\n";
string_stream << "slave_priority:" << config_->slave_priority << "\r\n";
}
@@ -1091,15 +1090,15 @@ void Server::SetLastRandomKeyCursor(const std::string
&cursor) {
}
int64_t Server::GetCachedUnixTime() {
- if (unix_time.load() == 0) {
+ if (unix_time_secs.load() == 0) {
updateCachedTime();
}
- return unix_time.load();
+ return unix_time_secs.load();
}
int64_t Server::GetLastBgsaveTime() {
std::lock_guard<std::mutex> lg(db_job_mu_);
- return last_bgsave_time_ == -1 ? start_time_ : last_bgsave_time_;
+ return last_bgsave_timestamp_secs_ == -1 ? start_time_secs_ :
last_bgsave_timestamp_secs_;
}
void Server::GetStatsInfo(std::string *info) {
@@ -1141,7 +1140,7 @@ void Server::GetCommandsStatsInfo(std::string *info) {
auto latency = cmd_stat.second.latency.load();
string_stream << "cmdstat_" << cmd_stat.first << ":calls=" << calls <<
",usec=" << latency
- << ",usec_per_call=" << ((calls == 0) ? 0 :
static_cast<float>(latency / calls)) << "\r\n";
+ << ",usec_per_call=" << static_cast<float>(latency / calls)
<< "\r\n";
}
*info = string_stream.str();
@@ -1195,9 +1194,10 @@ void Server::GetInfo(const std::string &ns, const
std::string §ion, std::str
std::lock_guard<std::mutex> lg(db_job_mu_);
string_stream << "bgsave_in_progress:" << (is_bgsave_in_progress_ ? 1 : 0)
<< "\r\n";
- string_stream << "last_bgsave_time:" << (last_bgsave_time_ == -1 ?
start_time_ : last_bgsave_time_) << "\r\n";
+ string_stream << "last_bgsave_time:"
+ << (last_bgsave_timestamp_secs_ == -1 ? start_time_secs_ :
last_bgsave_timestamp_secs_) << "\r\n";
string_stream << "last_bgsave_status:" << last_bgsave_status_ << "\r\n";
- string_stream << "last_bgsave_time_sec:" << last_bgsave_time_sec_ <<
"\r\n";
+ string_stream << "last_bgsave_time_sec:" << last_bgsave_duration_secs_ <<
"\r\n";
}
if (all || section == "stats") {
@@ -1249,8 +1249,9 @@ void Server::GetInfo(const std::string &ns, const
std::string §ion, std::str
KeyNumStats stats;
GetLatestKeyNumStats(ns, &stats);
- time_t last_scan_time = GetLastScanTime(ns);
- tm last_scan_tm{};
+ // FIXME(mwish): output still requires std::tm.
+ auto last_scan_time = static_cast<time_t>(GetLastScanTime(ns));
+ std::tm last_scan_tm{};
localtime_r(&last_scan_time, &last_scan_tm);
if (section_cnt++) string_stream << "\r\n";
@@ -1393,15 +1394,15 @@ Status Server::AsyncBgSaveDB() {
is_bgsave_in_progress_ = true;
return task_runner_.TryPublish([this] {
- auto start_bgsave_time = util::GetTimeStamp();
+ auto start_bgsave_time_secs = util::GetTimeStamp<std::chrono::seconds>();
Status s = storage->CreateBackup();
- auto stop_bgsave_time = util::GetTimeStamp();
+ auto stop_bgsave_time_secs = util::GetTimeStamp<std::chrono::seconds>();
std::lock_guard<std::mutex> lg(db_job_mu_);
is_bgsave_in_progress_ = false;
- last_bgsave_time_ = start_bgsave_time;
+ last_bgsave_timestamp_secs_ = start_bgsave_time_secs;
last_bgsave_status_ = s.IsOK() ? "ok" : "err";
- last_bgsave_time_sec_ = stop_bgsave_time - start_bgsave_time;
+ last_bgsave_duration_secs_ = stop_bgsave_time_secs -
start_bgsave_time_secs;
});
}
@@ -1436,7 +1437,7 @@ Status Server::AsyncScanDBSize(const std::string &ns) {
std::lock_guard<std::mutex> lg(db_job_mu_);
db_scan_infos_[ns].key_num_stats = stats;
- db_scan_infos_[ns].last_scan_time = util::GetTimeStamp();
+ db_scan_infos_[ns].last_scan_time_secs = util::GetTimeStamp();
db_scan_infos_[ns].is_scanning = false;
});
}
@@ -1529,10 +1530,10 @@ void Server::GetLatestKeyNumStats(const std::string
&ns, KeyNumStats *stats) {
}
}
-time_t Server::GetLastScanTime(const std::string &ns) {
+int64_t Server::GetLastScanTime(const std::string &ns) const {
auto iter = db_scan_infos_.find(ns);
if (iter != db_scan_infos_.end()) {
- return iter->second.last_scan_time;
+ return iter->second.last_scan_time_secs;
}
return 0;
}
diff --git a/src/server/server.h b/src/server/server.h
index a0f0477b..ad967c77 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -56,7 +56,8 @@
constexpr const char *REDIS_VERSION = "4.0.0";
struct DBScanInfo {
- time_t last_scan_time = 0;
+ // Last scan system clock in seconds
+ int64_t last_scan_time_secs = 0;
KeyNumStats key_num_stats;
bool is_scanning = false;
};
@@ -249,7 +250,7 @@ class Server {
Status AsyncPurgeOldBackups(uint32_t num_backups_to_keep, uint32_t
backup_max_keep_hours);
Status AsyncScanDBSize(const std::string &ns);
void GetLatestKeyNumStats(const std::string &ns, KeyNumStats *stats);
- time_t GetLastScanTime(const std::string &ns);
+ int64_t GetLastScanTime(const std::string &ns) const;
std::string GenerateCursorFromKeyName(const std::string &key_name,
CursorType cursor_type, const char *prefix = "");
std::string GetKeyNameFromCursor(const std::string &cursor, CursorType
cursor_type);
@@ -294,7 +295,7 @@ class Server {
Stats stats;
engine::Storage *storage;
std::unique_ptr<Cluster> cluster;
- static inline std::atomic<int64_t> unix_time = 0;
+ static inline std::atomic<int64_t> unix_time_secs = 0;
std::unique_ptr<SlotMigrator> slot_migrator;
std::unique_ptr<SlotImport> slot_import;
@@ -325,7 +326,7 @@ class Server {
std::atomic<bool> stop_ = false;
std::atomic<bool> is_loading_ = false;
- int64_t start_time_;
+ int64_t start_time_secs_;
std::mutex slaveof_mu_;
std::string master_host_;
uint32_t master_port_ = 0;
@@ -355,9 +356,9 @@ class Server {
std::mutex db_job_mu_;
bool db_compacting_ = false;
bool is_bgsave_in_progress_ = false;
- int64_t last_bgsave_time_ = -1;
+ int64_t last_bgsave_timestamp_secs_ = -1;
std::string last_bgsave_status_ = "ok";
- int64_t last_bgsave_time_sec_ = -1;
+ int64_t last_bgsave_duration_secs_ = -1;
std::map<std::string, DBScanInfo> db_scan_infos_;
diff --git a/src/stats/stats.cc b/src/stats/stats.cc
index 115fc4d9..ae18638b 100644
--- a/src/stats/stats.cc
+++ b/src/stats/stats.cc
@@ -29,7 +29,7 @@
Stats::Stats() {
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
InstMetric im;
- im.last_sample_time = 0;
+ im.last_sample_time_ms = 0;
im.last_sample_count = 0;
im.idx = 0;
for (uint64_t &sample : im.samples) {
@@ -93,15 +93,15 @@ void Stats::IncrLatency(uint64_t latency, const std::string
&command_name) {
}
void Stats::TrackInstantaneousMetric(int metric, uint64_t current_reading) {
- uint64_t curr_time = util::GetTimeStampMS();
+ uint64_t curr_time_ms = util::GetTimeStampMS();
std::unique_lock<std::shared_mutex> lock(inst_metrics_mutex);
- uint64_t t = curr_time - inst_metrics[metric].last_sample_time;
+ uint64_t t = curr_time_ms - inst_metrics[metric].last_sample_time_ms;
uint64_t ops = current_reading - inst_metrics[metric].last_sample_count;
uint64_t ops_sec = t > 0 ? (ops * 1000 / t) : 0;
inst_metrics[metric].samples[inst_metrics[metric].idx] = ops_sec;
inst_metrics[metric].idx++;
inst_metrics[metric].idx %= STATS_METRIC_SAMPLES;
- inst_metrics[metric].last_sample_time = curr_time;
+ inst_metrics[metric].last_sample_time_ms = curr_time_ms;
inst_metrics[metric].last_sample_count = current_reading;
}
diff --git a/src/stats/stats.h b/src/stats/stats.h
index 88ab2108..6fdba09a 100644
--- a/src/stats/stats.h
+++ b/src/stats/stats.h
@@ -49,8 +49,8 @@ struct CommandStat {
};
struct InstMetric {
- uint64_t last_sample_time; // Timestamp of the last sample in ms
- uint64_t last_sample_count; // Count in the last sample
+ uint64_t last_sample_time_ms; // Timestamp of the last sample in ms
+ uint64_t last_sample_count; // Count in the last sample
uint64_t samples[STATS_METRIC_SAMPLES];
int idx;
};
diff --git a/src/storage/rdb.cc b/src/storage/rdb.cc
index 79c0b8c8..f513caca 100644
--- a/src/storage/rdb.cc
+++ b/src/storage/rdb.cc
@@ -459,11 +459,11 @@ Status RDB::saveRdbObject(int type, const std::string
&key, const RedisObjValue
if (type == RDBTypeString) {
const auto &value = std::get<std::string>(obj);
redis::String string_db(storage_, ns_);
- uint64_t expire = 0;
+ uint64_t expire_ms = 0;
if (ttl_ms > 0) {
- expire = ttl_ms + util::GetTimeStampMS();
+ expire_ms = ttl_ms + util::GetTimeStampMS();
}
- db_status = string_db.SetEX(key, value, expire);
+ db_status = string_db.SetEX(key, value, expire_ms);
} else if (type == RDBTypeSet || type == RDBTypeSetIntSet || type ==
RDBTypeSetListPack) {
const auto &members = std::get<std::vector<std::string>>(obj);
redis::Set set_db(storage_, ns_);
@@ -567,21 +567,20 @@ Status RDB::LoadRdb(uint32_t db_index, bool
overwrite_exist_key) {
return {Status::NotOK, fmt::format("Can't handle RDB format version {}",
rdb_ver)};
}
- uint64_t expire_time = 0;
+ uint64_t expire_time_ms = 0;
int64_t expire_keys = 0;
int64_t load_keys = 0;
int64_t empty_keys_skipped = 0;
- auto now = util::GetTimeStampMS();
+ auto now_ms = util::GetTimeStampMS();
uint32_t db_id = 0;
uint64_t skip_exist_keys = 0;
while (true) {
auto type = GET_OR_RET(LogWhenError(loadRdbType()));
if (type == RDBOpcodeExpireTime) {
- expire_time =
static_cast<uint64_t>(GET_OR_RET(LogWhenError(loadExpiredTimeSeconds())));
- expire_time *= 1000;
+ expire_time_ms =
static_cast<uint64_t>(GET_OR_RET(LogWhenError(loadExpiredTimeSeconds()))) *
1000;
continue;
} else if (type == RDBOpcodeExpireTimeMs) {
- expire_time =
GET_OR_RET(LogWhenError(loadExpiredTimeMilliseconds(rdb_ver)));
+ expire_time_ms =
GET_OR_RET(LogWhenError(loadExpiredTimeMilliseconds(rdb_ver)));
continue;
} else if (type == RDBOpcodeFreq) { // LFU frequency: not
use in kvrocks
GET_OR_RET(LogWhenError(stream_->ReadByte())); // discard the value
@@ -637,8 +636,8 @@ Status RDB::LoadRdb(uint32_t db_index, bool
overwrite_exist_key) {
LOG(WARNING) << "skipping empty key: " << key;
}
continue;
- } else if (expire_time != 0 &&
- expire_time < now) { // in redis this used to feed this
deletion to any connected replicas
+ } else if (expire_time_ms != 0 &&
+ expire_time_ms < now_ms) { // in redis this used to feed this
deletion to any connected replicas
expire_keys++;
continue;
}
@@ -655,7 +654,7 @@ Status RDB::LoadRdb(uint32_t db_index, bool
overwrite_exist_key) {
}
}
- auto ret = saveRdbObject(type, key, value, expire_time);
+ auto ret = saveRdbObject(type, key, value, expire_time_ms);
if (!ret.IsOK()) {
LOG(WARNING) << "save rdb object key " << key << " failed: " <<
ret.Msg();
} else {
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 1759d2e7..b25ae8a8 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -67,7 +67,7 @@ constexpr double kRocksdbLRUBlockCacheHighPriPoolRatio = 0.75;
constexpr double kRocksdbLRURowCacheHighPriPoolRatio = 0.5;
// used in creating rocksdb::HyperClockCache, set`estimated_entry_charge` to 0
means let rocksdb dynamically and
-// automacally adjust the table size for the cache.
+// automatically adjust the table size for the cache.
constexpr size_t kRockdbHCCAutoAdjustCharge = 0;
const int64_t kIORateLimitMaxMb = 1024000;
@@ -75,7 +75,7 @@ const int64_t kIORateLimitMaxMb = 1024000;
using rocksdb::Slice;
Storage::Storage(Config *config)
- : backup_creating_time_(util::GetTimeStamp()),
+ : backup_creating_time_secs_(util::GetTimeStamp<std::chrono::seconds>()),
env_(rocksdb::Env::Default()),
config_(config),
lock_mgr_(16),
@@ -421,8 +421,8 @@ Status Storage::CreateBackup(uint64_t *sequence_number) {
return {Status::NotOK, s.ToString()};
}
- // 'backup_mu_' can guarantee 'backup_creating_time_' is thread-safe
- backup_creating_time_ = static_cast<time_t>(util::GetTimeStamp());
+ // 'backup_mu_' can guarantee 'backup_creating_time_secs_' is thread-safe
+ backup_creating_time_secs_ = util::GetTimeStamp<std::chrono::seconds>();
LOG(INFO) << "[storage] Success to create new backup";
return Status::OK();
@@ -546,7 +546,7 @@ void Storage::EmptyDB() {
}
void Storage::PurgeOldBackups(uint32_t num_backups_to_keep, uint32_t
backup_max_keep_hours) {
- time_t now = util::GetTimeStamp();
+ auto now_secs = util::GetTimeStamp<std::chrono::seconds>();
std::lock_guard<std::mutex> lg(config_->backup_mu);
std::string task_backup_dir = config_->backup_dir;
@@ -555,13 +555,14 @@ void Storage::PurgeOldBackups(uint32_t
num_backups_to_keep, uint32_t backup_max_
if (!s.ok()) return;
// No backup is needed to keep or the backup is expired, we will clean it.
- bool backup_expired = (backup_max_keep_hours != 0 && backup_creating_time_ +
backup_max_keep_hours * 3600 < now);
+ bool backup_expired =
+ (backup_max_keep_hours != 0 && backup_creating_time_secs_ +
backup_max_keep_hours * 3600 < now_secs);
if (num_backups_to_keep == 0 || backup_expired) {
s = rocksdb::DestroyDB(task_backup_dir, rocksdb::Options());
if (s.ok()) {
- LOG(INFO) << "[storage] Succeeded cleaning old backup that was created
at " << backup_creating_time_;
+ LOG(INFO) << "[storage] Succeeded cleaning old backup that was created
at " << backup_creating_time_secs_;
} else {
- LOG(INFO) << "[storage] Failed cleaning old backup that was created at "
<< backup_creating_time_
+ LOG(INFO) << "[storage] Failed cleaning old backup that was created at "
<< backup_creating_time_secs_
<< ". Error: " << s.ToString();
}
}
@@ -975,9 +976,9 @@ Status
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
uint64_t checkpoint_latest_seq = 0;
s = checkpoint->CreateCheckpoint(data_files_dir,
storage->config_->rocks_db.write_buffer_size * MiB,
&checkpoint_latest_seq);
- auto now = static_cast<time_t>(util::GetTimeStamp());
- storage->checkpoint_info_.create_time = now;
- storage->checkpoint_info_.access_time = now;
+ auto now_secs = util::GetTimeStamp<std::chrono::seconds>();
+ storage->checkpoint_info_.create_time_secs = now_secs;
+ storage->checkpoint_info_.access_time_secs = now_secs;
storage->checkpoint_info_.latest_seq = checkpoint_latest_seq;
if (!s.ok()) {
LOG(WARNING) << "[storage] Failed to create checkpoint (snapshot).
Error: " << s.ToString();
@@ -987,12 +988,12 @@ Status
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
LOG(INFO) << "[storage] Create checkpoint successfully";
} else {
// Replicas can share checkpoint to replication if the checkpoint existing
time is less than a half of WAL TTL.
- int64_t can_shared_time = storage->config_->rocks_db.wal_ttl_seconds / 2;
- if (can_shared_time > 60 * 60) can_shared_time = 60 * 60;
- if (can_shared_time < 10 * 60) can_shared_time = 10 * 60;
+ int64_t can_shared_time_secs = storage->config_->rocks_db.wal_ttl_seconds
/ 2;
+ if (can_shared_time_secs > 60 * 60) can_shared_time_secs = 60 * 60;
+ if (can_shared_time_secs < 10 * 60) can_shared_time_secs = 10 * 60;
- auto now = static_cast<time_t>(util::GetTimeStamp());
- if (now - storage->GetCheckpointCreateTime() > can_shared_time) {
+ auto now_secs = util::GetTimeStamp<std::chrono::seconds>();
+ if (now_secs - storage->GetCheckpointCreateTimeSecs() >
can_shared_time_secs) {
LOG(WARNING) << "[storage] Can't use current checkpoint, waiting next
checkpoint";
return {Status::NotOK, "Can't use current checkpoint, waiting for next
checkpoint"};
}
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 208499b5..09151baa 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -215,8 +215,10 @@ class Storage {
static int OpenDataFile(Storage *storage, const std::string &rel_file,
uint64_t *file_size);
static Status CleanInvalidFiles(Storage *storage, const std::string &dir,
std::vector<std::string> valid_files);
struct CheckpointInfo {
- std::atomic<time_t> create_time = 0;
- std::atomic<time_t> access_time = 0;
+ // System clock time when the checkpoint was created.
+ std::atomic<int64_t> create_time_secs = 0;
+ // System clock time when the checkpoint was last accessed.
+ std::atomic<int64_t> access_time_secs = 0;
uint64_t latest_seq = 0;
};
@@ -238,9 +240,9 @@ class Storage {
bool ExistCheckpoint();
bool ExistSyncCheckpoint();
- time_t GetCheckpointCreateTime() const { return
checkpoint_info_.create_time; }
- void SetCheckpointAccessTime(time_t t) { checkpoint_info_.access_time = t; }
- time_t GetCheckpointAccessTime() const { return
checkpoint_info_.access_time; }
+ int64_t GetCheckpointCreateTimeSecs() const { return
checkpoint_info_.create_time_secs; }
+ void SetCheckpointAccessTimeSecs(int64_t t) {
checkpoint_info_.access_time_secs = t; }
+ int64_t GetCheckpointAccessTimeSecs() const { return
checkpoint_info_.access_time_secs; }
void SetDBInRetryableIOError(bool yes_or_no) { db_in_retryable_io_error_ =
yes_or_no; }
bool IsDBInRetryableIOError() const { return db_in_retryable_io_error_; }
@@ -251,7 +253,8 @@ class Storage {
private:
std::unique_ptr<rocksdb::DB> db_ = nullptr;
std::string replid_;
- time_t backup_creating_time_;
+ // The system clock time when the backup was created.
+ int64_t backup_creating_time_secs_;
std::unique_ptr<rocksdb::BackupEngine> backup_ = nullptr;
rocksdb::Env *env_;
std::shared_ptr<rocksdb::SstFileManager> sst_file_manager_;
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 1b9444ed..6c66f800 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -237,8 +237,8 @@ std::string
Stream::consumerNameFromInternalKey(rocksdb::Slice key) const {
std::string Stream::encodeStreamConsumerMetadataValue(const
StreamConsumerMetadata &consumer_metadata) {
std::string dst;
PutFixed64(&dst, consumer_metadata.pending_number);
- PutFixed64(&dst, consumer_metadata.last_idle);
- PutFixed64(&dst, consumer_metadata.last_active);
+ PutFixed64(&dst, consumer_metadata.last_idle_ms);
+ PutFixed64(&dst, consumer_metadata.last_active_ms);
return dst;
}
@@ -246,8 +246,8 @@ StreamConsumerMetadata
Stream::decodeStreamConsumerMetadataValue(const std::stri
StreamConsumerMetadata consumer_metadata;
rocksdb::Slice input(value);
GetFixed64(&input, &consumer_metadata.pending_number);
- GetFixed64(&input, &consumer_metadata.last_idle);
- GetFixed64(&input, &consumer_metadata.last_active);
+ GetFixed64(&input, &consumer_metadata.last_idle_ms);
+ GetFixed64(&input, &consumer_metadata.last_active_ms);
return consumer_metadata;
}
@@ -277,7 +277,7 @@ StreamEntryID
Stream::groupAndEntryIdFromPelInternalKey(rocksdb::Slice key, std:
std::string Stream::encodeStreamPelEntryValue(const StreamPelEntry &pel_entry)
{
std::string dst;
- PutFixed64(&dst, pel_entry.last_delivery_time);
+ PutFixed64(&dst, pel_entry.last_delivery_time_ms);
PutFixed64(&dst, pel_entry.last_delivery_count);
PutFixed64(&dst, pel_entry.consumer_name.size());
dst += pel_entry.consumer_name;
@@ -287,7 +287,7 @@ std::string Stream::encodeStreamPelEntryValue(const
StreamPelEntry &pel_entry) {
StreamPelEntry Stream::decodeStreamPelEntryValue(const std::string &value) {
StreamPelEntry pel_entry;
rocksdb::Slice input(value);
- GetFixed64(&input, &pel_entry.last_delivery_time);
+ GetFixed64(&input, &pel_entry.last_delivery_time_ms);
GetFixed64(&input, &pel_entry.last_delivery_count);
uint64_t consumer_name_len = 0;
GetFixed64(&input, &consumer_name_len);
@@ -487,8 +487,8 @@ rocksdb::Status Stream::createConsumerWithoutLock(const
Slice &stream_name, cons
StreamConsumerMetadata consumer_metadata;
auto now = util::GetTimeStampMS();
- consumer_metadata.last_idle = now;
- consumer_metadata.last_active = now;
+ consumer_metadata.last_idle_ms = now;
+ consumer_metadata.last_active_ms = now;
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
std::string consumer_value =
encodeStreamConsumerMetadataValue(consumer_metadata);
std::string get_consumer_value;
@@ -1153,9 +1153,9 @@ rocksdb::Status Stream::RangeWithPending(const Slice
&stream_name, StreamRangeOp
return s;
}
StreamConsumerMetadata consumer_metadata =
decodeStreamConsumerMetadataValue(get_consumer_value);
- auto now = util::GetTimeStampMS();
- consumer_metadata.last_idle = now;
- consumer_metadata.last_active = now;
+ auto now_ms = util::GetTimeStampMS();
+ consumer_metadata.last_idle_ms = now_ms;
+ consumer_metadata.last_active_ms = now_ms;
if (latest) {
options.start = consumergroup_metadata.last_delivered_id;
@@ -1219,7 +1219,7 @@ rocksdb::Status Stream::RangeWithPending(const Slice
&stream_name, StreamRangeOp
}
entries->emplace_back(entry_id.ToString(), std::move(values));
pel_entry.last_delivery_count += 1;
- pel_entry.last_delivery_time = now;
+ pel_entry.last_delivery_time_ms = now_ms;
batch->Put(stream_cf_handle_, iter->key(),
encodeStreamPelEntryValue(pel_entry));
++count;
if (count >= options.count) break;
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 60d54d23..ae90a3e5 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -171,8 +171,8 @@ struct StreamConsumerGroupMetadata {
struct StreamConsumerMetadata {
uint64_t pending_number = 0;
- uint64_t last_idle;
- uint64_t last_active;
+ uint64_t last_idle_ms;
+ uint64_t last_active_ms;
};
enum class StreamSubkeyType {
@@ -183,7 +183,7 @@ enum class StreamSubkeyType {
};
struct StreamPelEntry {
- uint64_t last_delivery_time;
+ uint64_t last_delivery_time_ms;
uint64_t last_delivery_count;
std::string consumer_name;
};
diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc
index e0f0c99d..0f0d4688 100644
--- a/src/types/redis_string.cc
+++ b/src/types/redis_string.cc
@@ -255,21 +255,21 @@ rocksdb::Status String::Set(const std::string &user_key,
const std::string &valu
return updateRawValue(ns_key, new_raw_value);
}
-rocksdb::Status String::SetEX(const std::string &user_key, const std::string
&value, uint64_t expire) {
+rocksdb::Status String::SetEX(const std::string &user_key, const std::string
&value, uint64_t expire_ms) {
std::optional<std::string> ret;
- return Set(user_key, value, {expire, StringSetType::NONE, /*get=*/false,
/*keep_ttl=*/false}, ret);
+ return Set(user_key, value, {expire_ms, StringSetType::NONE, /*get=*/false,
/*keep_ttl=*/false}, ret);
}
-rocksdb::Status String::SetNX(const std::string &user_key, const std::string
&value, uint64_t expire, bool *flag) {
+rocksdb::Status String::SetNX(const std::string &user_key, const std::string
&value, uint64_t expire_ms, bool *flag) {
std::optional<std::string> ret;
- auto s = Set(user_key, value, {expire, StringSetType::NX, /*get=*/false,
/*keep_ttl=*/false}, ret);
+ auto s = Set(user_key, value, {expire_ms, StringSetType::NX, /*get=*/false,
/*keep_ttl=*/false}, ret);
*flag = ret.has_value();
return s;
}
-rocksdb::Status String::SetXX(const std::string &user_key, const std::string
&value, uint64_t expire, bool *flag) {
+rocksdb::Status String::SetXX(const std::string &user_key, const std::string
&value, uint64_t expire_ms, bool *flag) {
std::optional<std::string> ret;
- auto s = Set(user_key, value, {expire, StringSetType::XX, /*get=*/false,
/*keep_ttl=*/false}, ret);
+ auto s = Set(user_key, value, {expire_ms, StringSetType::XX, /*get=*/false,
/*keep_ttl=*/false}, ret);
*flag = ret.has_value();
return s;
}
@@ -384,7 +384,7 @@ rocksdb::Status String::IncrByFloat(const std::string
&user_key, double incremen
return updateRawValue(ns_key, raw_value);
}
-rocksdb::Status String::MSet(const std::vector<StringPair> &pairs, uint64_t
expire, bool lock) {
+rocksdb::Status String::MSet(const std::vector<StringPair> &pairs, uint64_t
expire_ms, bool lock) {
// Data race, key string maybe overwrite by other key while didn't lock the
keys here,
// to improve the set performance
std::optional<MultiLockGuard> guard;
@@ -404,7 +404,7 @@ rocksdb::Status String::MSet(const std::vector<StringPair>
&pairs, uint64_t expi
for (const auto &pair : pairs) {
std::string bytes;
Metadata metadata(kRedisString, false);
- metadata.expire = expire;
+ metadata.expire = expire_ms;
metadata.Encode(&bytes);
bytes.append(pair.value.data(), pair.value.size());
std::string ns_key = AppendNamespacePrefix(pair.key);
@@ -413,7 +413,7 @@ rocksdb::Status String::MSet(const std::vector<StringPair>
&pairs, uint64_t expi
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
-rocksdb::Status String::MSetNX(const std::vector<StringPair> &pairs, uint64_t
expire, bool *flag) {
+rocksdb::Status String::MSetNX(const std::vector<StringPair> &pairs, uint64_t
expire_ms, bool *flag) {
*flag = false;
int exists = 0;
@@ -435,7 +435,7 @@ rocksdb::Status String::MSetNX(const
std::vector<StringPair> &pairs, uint64_t ex
return rocksdb::Status::OK();
}
- rocksdb::Status s = MSet(pairs, /*expire=*/expire, /*lock=*/false);
+ rocksdb::Status s = MSet(pairs, /*expire_ms=*/expire_ms, /*lock=*/false);
if (!s.ok()) return s;
*flag = true;
diff --git a/src/types/redis_string.h b/src/types/redis_string.h
index faf30259..34afb0bd 100644
--- a/src/types/redis_string.h
+++ b/src/types/redis_string.h
@@ -37,6 +37,7 @@ struct StringPair {
enum class StringSetType { NONE, NX, XX };
struct StringSetArgs {
+ // Expire time in mill seconds.
uint64_t expire;
StringSetType type;
bool get;
@@ -85,24 +86,24 @@ class String : public Database {
rocksdb::Status Set(const std::string &user_key, const std::string &value);
rocksdb::Status Set(const std::string &user_key, const std::string &value,
StringSetArgs args,
std::optional<std::string> &ret);
- rocksdb::Status SetEX(const std::string &user_key, const std::string &value,
uint64_t expire);
- rocksdb::Status SetNX(const std::string &user_key, const std::string &value,
uint64_t expire, bool *flag);
- rocksdb::Status SetXX(const std::string &user_key, const std::string &value,
uint64_t expire, bool *flag);
+ rocksdb::Status SetEX(const std::string &user_key, const std::string &value,
uint64_t expire_ms);
+ rocksdb::Status SetNX(const std::string &user_key, const std::string &value,
uint64_t expire_ms, bool *flag);
+ rocksdb::Status SetXX(const std::string &user_key, const std::string &value,
uint64_t expire_ms, bool *flag);
rocksdb::Status SetRange(const std::string &user_key, size_t offset, const
std::string &value, uint64_t *new_size);
rocksdb::Status IncrBy(const std::string &user_key, int64_t increment,
int64_t *new_value);
rocksdb::Status IncrByFloat(const std::string &user_key, double increment,
double *new_value);
std::vector<rocksdb::Status> MGet(const std::vector<Slice> &keys,
std::vector<std::string> *values);
- rocksdb::Status MSet(const std::vector<StringPair> &pairs, uint64_t expire,
bool lock = true);
- rocksdb::Status MSetNX(const std::vector<StringPair> &pairs, uint64_t
expire, bool *flag);
+ rocksdb::Status MSet(const std::vector<StringPair> &pairs, uint64_t
expire_ms, bool lock = true);
+ rocksdb::Status MSetNX(const std::vector<StringPair> &pairs, uint64_t
expire_ms, bool *flag);
rocksdb::Status CAS(const std::string &user_key, const std::string
&old_value, const std::string &new_value,
- uint64_t expire, int *flag);
+ uint64_t expire_ms, int *flag);
rocksdb::Status CAD(const std::string &user_key, const std::string &value,
int *flag);
rocksdb::Status LCS(const std::string &user_key1, const std::string
&user_key2, StringLCSArgs args,
StringLCSResult *rst);
private:
rocksdb::Status getValue(const std::string &ns_key, std::string *value);
- rocksdb::Status getValueAndExpire(const std::string &ns_key, std::string
*value, uint64_t *expire);
+ rocksdb::Status getValueAndExpire(const std::string &ns_key, std::string
*value, uint64_t *expire_ms);
std::vector<rocksdb::Status> getValues(const std::vector<Slice> &ns_keys,
std::vector<std::string> *values);
rocksdb::Status getRawValue(const std::string &ns_key, std::string
*raw_value);
std::vector<rocksdb::Status> getRawValues(const std::vector<Slice> &keys,
std::vector<std::string> *raw_values);