This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 93ce7925 Trying to add the unit of time in variable name (#2276)
93ce7925 is described below

commit 93ce7925b26d619ad81c077191ddd69a7fb55d42
Author: mwish <[email protected]>
AuthorDate: Tue May 7 14:01:25 2024 +0800

    Trying to add the unit of time in variable name (#2276)
    
    Co-authored-by: hulk <[email protected]>
---
 src/cluster/replication.cc      |  2 +-
 src/cluster/replication.h       |  4 +--
 src/commands/cmd_replication.cc |  8 ++---
 src/commands/cmd_stream.cc      |  6 ++--
 src/common/cron.cc              |  2 +-
 src/common/cron.h               |  2 +-
 src/common/time_util.h          |  1 +
 src/server/server.cc            | 73 +++++++++++++++++++++--------------------
 src/server/server.h             | 13 ++++----
 src/stats/stats.cc              |  8 ++---
 src/stats/stats.h               |  4 +--
 src/storage/rdb.cc              | 21 ++++++------
 src/storage/storage.cc          | 33 ++++++++++---------
 src/storage/storage.h           | 15 +++++----
 src/types/redis_stream.cc       | 24 +++++++-------
 src/types/redis_stream_base.h   |  6 ++--
 src/types/redis_string.cc       | 20 +++++------
 src/types/redis_string.h        | 15 +++++----
 18 files changed, 132 insertions(+), 125 deletions(-)

diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 4df05a47..57d8b9bc 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -201,7 +201,7 @@ LOOP_LABEL:
   assert(handler_idx_ <= handlers_.size());
   DLOG(INFO) << "[replication] Execute handler[" << 
getHandlerName(handler_idx_) << "]";
   auto st = getHandlerFunc(handler_idx_)(repl_, bev);
-  repl_->last_io_time_.store(util::GetTimeStamp(), std::memory_order_relaxed);
+  repl_->last_io_time_secs_.store(util::GetTimeStamp(), 
std::memory_order_relaxed);
   switch (st) {
     case CBState::NEXT:
       ++handler_idx_;
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index b7f49717..b223bd6a 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -98,7 +98,7 @@ class ReplicationThread : private 
EventCallbackBase<ReplicationThread> {
   Status Start(std::function<void()> &&pre_fullsync_cb, std::function<void()> 
&&post_fullsync_cb);
   void Stop();
   ReplState State() { return repl_state_.load(std::memory_order_relaxed); }
-  time_t LastIOTime() { return last_io_time_.load(std::memory_order_relaxed); }
+  int64_t LastIOTimeSecs() const { return 
last_io_time_secs_.load(std::memory_order_relaxed); }
 
   void TimerCB(int, int16_t);
 
@@ -155,7 +155,7 @@ class ReplicationThread : private 
EventCallbackBase<ReplicationThread> {
   Server *srv_ = nullptr;
   engine::Storage *storage_ = nullptr;
   std::atomic<ReplState> repl_state_;
-  std::atomic<time_t> last_io_time_ = 0;
+  std::atomic<int64_t> last_io_time_secs_ = 0;
   bool next_try_old_psync_ = false;
   bool next_try_without_announce_ip_address_ = false;
 
diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc
index 0a86a9cc..6beffea8 100644
--- a/src/commands/cmd_replication.cc
+++ b/src/commands/cmd_replication.cc
@@ -242,8 +242,8 @@ class CommandFetchMeta : public Commander {
       } else {
         LOG(WARNING) << "[replication] Fail to send full data file info " << 
ip << ", error: " << strerror(errno);
       }
-      auto now = static_cast<time_t>(util::GetTimeStamp());
-      srv->storage->SetCheckpointAccessTime(now);
+      auto now_secs = static_cast<time_t>(util::GetTimeStamp());
+      srv->storage->SetCheckpointAccessTimeSecs(now_secs);
     }));
 
     if (auto s = util::ThreadDetach(t); !s) {
@@ -311,8 +311,8 @@ class CommandFetchFile : public Commander {
           usleep(shortest - duration);
         }
       }
-      auto now = static_cast<time_t>(util::GetTimeStamp());
-      srv->storage->SetCheckpointAccessTime(now);
+      auto now_secs = util::GetTimeStamp<std::chrono::seconds>();
+      srv->storage->SetCheckpointAccessTimeSecs(now_secs);
       srv->DecrFetchFileThread();
     }));
 
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index f8736168..a5fdf795 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -588,7 +588,7 @@ class CommandXInfo : public Commander {
     }
 
     output->append(redis::MultiLen(result_vector.size()));
-    auto now = util::GetTimeStampMS();
+    auto now_ms = util::GetTimeStampMS();
     for (auto const &it : result_vector) {
       output->append(conn->HeaderOfMap(4));
       output->append(redis::BulkString("name"));
@@ -596,9 +596,9 @@ class CommandXInfo : public Commander {
       output->append(redis::BulkString("pending"));
       output->append(redis::Integer(it.second.pending_number));
       output->append(redis::BulkString("idle"));
-      output->append(redis::Integer(now - it.second.last_idle));
+      output->append(redis::Integer(now_ms - it.second.last_idle_ms));
       output->append(redis::BulkString("inactive"));
-      output->append(redis::Integer(now - it.second.last_active));
+      output->append(redis::Integer(now_ms - it.second.last_active_ms));
     }
 
     return Status::OK();
diff --git a/src/common/cron.cc b/src/common/cron.cc
index f4c223bf..2c4a03ba 100644
--- a/src/common/cron.cc
+++ b/src/common/cron.cc
@@ -52,7 +52,7 @@ Status Cron::SetScheduleTime(const std::vector<std::string> 
&args) {
   return Status::OK();
 }
 
-bool Cron::IsTimeMatch(tm *tm) {
+bool Cron::IsTimeMatch(const tm *tm) {
   if (tm->tm_min == last_tm_.tm_min && tm->tm_hour == last_tm_.tm_hour && 
tm->tm_mday == last_tm_.tm_mday &&
       tm->tm_mon == last_tm_.tm_mon && tm->tm_wday == last_tm_.tm_wday) {
     return false;
diff --git a/src/common/cron.h b/src/common/cron.h
index cba6d275..5385a0ef 100644
--- a/src/common/cron.h
+++ b/src/common/cron.h
@@ -43,7 +43,7 @@ class Cron {
   ~Cron() = default;
 
   Status SetScheduleTime(const std::vector<std::string> &args);
-  bool IsTimeMatch(tm *tm);
+  bool IsTimeMatch(const tm *tm);
   std::string ToString() const;
   bool IsEnabled() const;
 
diff --git a/src/common/time_util.h b/src/common/time_util.h
index 1c8dc7b6..9eb6daa4 100644
--- a/src/common/time_util.h
+++ b/src/common/time_util.h
@@ -24,6 +24,7 @@
 
 namespace util {
 
+/// Get the system timestamp in seconds, milliseconds or microseconds.
 template <typename Duration = std::chrono::seconds>
 auto GetTimeStamp() {
   return 
std::chrono::duration_cast<Duration>(std::chrono::system_clock::now().time_since_epoch()).count();
diff --git a/src/server/server.cc b/src/server/server.cc
index e482aefb..7c50a008 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -52,7 +52,7 @@
 #include "worker.h"
 
 Server::Server(engine::Storage *storage, Config *config)
-    : storage(storage), start_time_(util::GetTimeStamp()), config_(config), 
namespace_(storage) {
+    : storage(storage), start_time_secs_(util::GetTimeStamp()), 
config_(config), namespace_(storage) {
   // init commands stats here to prevent concurrent insert, and cause core
   auto commands = redis::CommandTable::GetOriginal();
   for (const auto &iter : *commands) {
@@ -179,7 +179,7 @@ Status Server::Start() {
 
   compaction_checker_thread_ = GET_OR_RET(util::CreateThread("compact-check", 
[this] {
     uint64_t counter = 0;
-    time_t last_compact_date = 0;
+    int64_t last_compact_date = 0;
     CompactionChecker compaction_checker{this->storage};
 
     while (!stop_) {
@@ -192,11 +192,9 @@ Status Server::Start() {
 
       if (!is_loading_ && ++counter % 600 == 0  // check every minute
           && config_->compaction_checker_range.Enabled()) {
-        auto now = static_cast<time_t>(util::GetTimeStamp());
-        std::tm local_time{};
-        localtime_r(&now, &local_time);
-        if (local_time.tm_hour >= config_->compaction_checker_range.start &&
-            local_time.tm_hour <= config_->compaction_checker_range.stop) {
+        auto now_hours = util::GetTimeStamp<std::chrono::hours>();
+        if (now_hours >= config_->compaction_checker_range.start &&
+            now_hours <= config_->compaction_checker_range.stop) {
           std::vector<std::string> cf_names = 
{engine::kMetadataColumnFamilyName, engine::kSubkeyColumnFamilyName,
                                                
engine::kZSetScoreColumnFamilyName, engine::kStreamColumnFamilyName};
           for (const auto &cf_name : cf_names) {
@@ -204,8 +202,8 @@ Status Server::Start() {
           }
         }
         // compact once per day
-        if (now != 0 && last_compact_date != now / 86400) {
-          last_compact_date = now / 86400;
+        if (now_hours != 0 && last_compact_date != now_hours / 24) {
+          last_compact_date = now_hours / 24;
           compaction_checker.CompactPropagateAndPubSubFiles();
         }
       }
@@ -344,9 +342,9 @@ void Server::CleanupExitedSlaves() {
 void Server::FeedMonitorConns(redis::Connection *conn, const 
std::vector<std::string> &tokens) {
   if (monitor_clients_ <= 0) return;
 
-  auto now = util::GetTimeStampUS();
+  auto now_us = util::GetTimeStampUS();
   std::string output =
-      fmt::format("{}.{} [{} {}]", now / 1000000, now % 1000000, 
conn->GetNamespace(), conn->GetAddr());
+      fmt::format("{}.{} [{} {}]", now_us / 1000000, now_us % 1000000, 
conn->GetNamespace(), conn->GetAddr());
   for (const auto &token : tokens) {
     output += " \"";
     output += util::EscapeString(token);
@@ -674,7 +672,7 @@ void Server::OnEntryAddedToStream(const std::string &ns, 
const std::string &key,
   }
 }
 
-void Server::updateCachedTime() { unix_time.store(util::GetTimeStamp()); }
+void Server::updateCachedTime() { unix_time_secs.store(util::GetTimeStamp()); }
 
 int Server::IncrClientNum() {
   total_clients_.fetch_add(1, std::memory_order_relaxed);
@@ -787,13 +785,14 @@ void Server::cron() {
 
     // No replica uses this checkpoint, we can remove it.
     if (counter != 0 && counter % 100 == 0) {
-      time_t create_time = storage->GetCheckpointCreateTime();
-      time_t access_time = storage->GetCheckpointAccessTime();
+      int64_t create_time_secs = storage->GetCheckpointCreateTimeSecs();
+      int64_t access_time_secs = storage->GetCheckpointAccessTimeSecs();
 
       if (storage->ExistCheckpoint()) {
         // TODO(shooterit): support to config the alive time of checkpoint
-        auto now = static_cast<time_t>(util::GetTimeStamp());
-        if ((GetFetchFileThreadNum() == 0 && now - access_time > 30) || (now - 
create_time > 24 * 60 * 60)) {
+        int64_t now_secs = util::GetTimeStamp<std::chrono::seconds>();
+        if ((GetFetchFileThreadNum() == 0 && now_secs - access_time_secs > 30) 
||
+            (now_secs - create_time_secs > 24 * 60 * 60)) {
           auto s = rocksdb::DestroyDB(config_->checkpoint_dir, 
rocksdb::Options());
           if (!s.ok()) {
             LOG(WARNING) << "[server] Fail to clean checkpoint, error: " << 
s.ToString();
@@ -963,9 +962,9 @@ void Server::GetServerInfo(std::string *info) {
   string_stream << "arch_bits:" << sizeof(void *) * 8 << "\r\n";
   string_stream << "process_id:" << getpid() << "\r\n";
   string_stream << "tcp_port:" << config_->port << "\r\n";
-  int64_t now = util::GetTimeStamp();
-  string_stream << "uptime_in_seconds:" << now - start_time_ << "\r\n";
-  string_stream << "uptime_in_days:" << (now - start_time_) / 86400 << "\r\n";
+  int64_t now_secs = util::GetTimeStamp<std::chrono::seconds>();
+  string_stream << "uptime_in_seconds:" << now_secs - start_time_secs_ << 
"\r\n";
+  string_stream << "uptime_in_days:" << (now_secs - start_time_secs_) / 86400 
<< "\r\n";
   *info = string_stream.str();
 }
 
@@ -1000,14 +999,14 @@ void Server::GetReplicationInfo(std::string *info) {
   string_stream << "# Replication\r\n";
   string_stream << "role:" << (IsSlave() ? "slave" : "master") << "\r\n";
   if (IsSlave()) {
-    time_t now = util::GetTimeStamp();
+    int64_t now_secs = util::GetTimeStamp<std::chrono::seconds>();
     string_stream << "master_host:" << master_host_ << "\r\n";
     string_stream << "master_port:" << master_port_ << "\r\n";
     ReplState state = GetReplicationState();
     string_stream << "master_link_status:" << (state == kReplConnected ? "up" 
: "down") << "\r\n";
     string_stream << "master_sync_unrecoverable_error:" << (state == 
kReplError ? "yes" : "no") << "\r\n";
     string_stream << "master_sync_in_progress:" << (state == kReplFetchMeta || 
state == kReplFetchSST) << "\r\n";
-    string_stream << "master_last_io_seconds_ago:" << now - 
replication_thread_->LastIOTime() << "\r\n";
+    string_stream << "master_last_io_seconds_ago:" << now_secs - 
replication_thread_->LastIOTimeSecs() << "\r\n";
     string_stream << "slave_repl_offset:" << storage->LatestSeqNumber() << 
"\r\n";
     string_stream << "slave_priority:" << config_->slave_priority << "\r\n";
   }
@@ -1091,15 +1090,15 @@ void Server::SetLastRandomKeyCursor(const std::string 
&cursor) {
 }
 
 int64_t Server::GetCachedUnixTime() {
-  if (unix_time.load() == 0) {
+  if (unix_time_secs.load() == 0) {
     updateCachedTime();
   }
-  return unix_time.load();
+  return unix_time_secs.load();
 }
 
 int64_t Server::GetLastBgsaveTime() {
   std::lock_guard<std::mutex> lg(db_job_mu_);
-  return last_bgsave_time_ == -1 ? start_time_ : last_bgsave_time_;
+  return last_bgsave_timestamp_secs_ == -1 ? start_time_secs_ : 
last_bgsave_timestamp_secs_;
 }
 
 void Server::GetStatsInfo(std::string *info) {
@@ -1141,7 +1140,7 @@ void Server::GetCommandsStatsInfo(std::string *info) {
 
     auto latency = cmd_stat.second.latency.load();
     string_stream << "cmdstat_" << cmd_stat.first << ":calls=" << calls << 
",usec=" << latency
-                  << ",usec_per_call=" << ((calls == 0) ? 0 : 
static_cast<float>(latency / calls)) << "\r\n";
+                  << ",usec_per_call=" << static_cast<float>(latency / calls) 
<< "\r\n";
   }
 
   *info = string_stream.str();
@@ -1195,9 +1194,10 @@ void Server::GetInfo(const std::string &ns, const 
std::string &section, std::str
 
     std::lock_guard<std::mutex> lg(db_job_mu_);
     string_stream << "bgsave_in_progress:" << (is_bgsave_in_progress_ ? 1 : 0) 
<< "\r\n";
-    string_stream << "last_bgsave_time:" << (last_bgsave_time_ == -1 ? 
start_time_ : last_bgsave_time_) << "\r\n";
+    string_stream << "last_bgsave_time:"
+                  << (last_bgsave_timestamp_secs_ == -1 ? start_time_secs_ : 
last_bgsave_timestamp_secs_) << "\r\n";
     string_stream << "last_bgsave_status:" << last_bgsave_status_ << "\r\n";
-    string_stream << "last_bgsave_time_sec:" << last_bgsave_time_sec_ << 
"\r\n";
+    string_stream << "last_bgsave_time_sec:" << last_bgsave_duration_secs_ << 
"\r\n";
   }
 
   if (all || section == "stats") {
@@ -1249,8 +1249,9 @@ void Server::GetInfo(const std::string &ns, const 
std::string &section, std::str
     KeyNumStats stats;
     GetLatestKeyNumStats(ns, &stats);
 
-    time_t last_scan_time = GetLastScanTime(ns);
-    tm last_scan_tm{};
+    // FIXME(mwish): output still requires std::tm.
+    auto last_scan_time = static_cast<time_t>(GetLastScanTime(ns));
+    std::tm last_scan_tm{};
     localtime_r(&last_scan_time, &last_scan_tm);
 
     if (section_cnt++) string_stream << "\r\n";
@@ -1393,15 +1394,15 @@ Status Server::AsyncBgSaveDB() {
   is_bgsave_in_progress_ = true;
 
   return task_runner_.TryPublish([this] {
-    auto start_bgsave_time = util::GetTimeStamp();
+    auto start_bgsave_time_secs = util::GetTimeStamp<std::chrono::seconds>();
     Status s = storage->CreateBackup();
-    auto stop_bgsave_time = util::GetTimeStamp();
+    auto stop_bgsave_time_secs = util::GetTimeStamp<std::chrono::seconds>();
 
     std::lock_guard<std::mutex> lg(db_job_mu_);
     is_bgsave_in_progress_ = false;
-    last_bgsave_time_ = start_bgsave_time;
+    last_bgsave_timestamp_secs_ = start_bgsave_time_secs;
     last_bgsave_status_ = s.IsOK() ? "ok" : "err";
-    last_bgsave_time_sec_ = stop_bgsave_time - start_bgsave_time;
+    last_bgsave_duration_secs_ = stop_bgsave_time_secs - 
start_bgsave_time_secs;
   });
 }
 
@@ -1436,7 +1437,7 @@ Status Server::AsyncScanDBSize(const std::string &ns) {
     std::lock_guard<std::mutex> lg(db_job_mu_);
 
     db_scan_infos_[ns].key_num_stats = stats;
-    db_scan_infos_[ns].last_scan_time = util::GetTimeStamp();
+    db_scan_infos_[ns].last_scan_time_secs = util::GetTimeStamp();
     db_scan_infos_[ns].is_scanning = false;
   });
 }
@@ -1529,10 +1530,10 @@ void Server::GetLatestKeyNumStats(const std::string 
&ns, KeyNumStats *stats) {
   }
 }
 
-time_t Server::GetLastScanTime(const std::string &ns) {
+int64_t Server::GetLastScanTime(const std::string &ns) const {
   auto iter = db_scan_infos_.find(ns);
   if (iter != db_scan_infos_.end()) {
-    return iter->second.last_scan_time;
+    return iter->second.last_scan_time_secs;
   }
   return 0;
 }
diff --git a/src/server/server.h b/src/server/server.h
index a0f0477b..ad967c77 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -56,7 +56,8 @@
 constexpr const char *REDIS_VERSION = "4.0.0";
 
 struct DBScanInfo {
-  time_t last_scan_time = 0;
+  // Last scan system clock in seconds
+  int64_t last_scan_time_secs = 0;
   KeyNumStats key_num_stats;
   bool is_scanning = false;
 };
@@ -249,7 +250,7 @@ class Server {
   Status AsyncPurgeOldBackups(uint32_t num_backups_to_keep, uint32_t 
backup_max_keep_hours);
   Status AsyncScanDBSize(const std::string &ns);
   void GetLatestKeyNumStats(const std::string &ns, KeyNumStats *stats);
-  time_t GetLastScanTime(const std::string &ns);
+  int64_t GetLastScanTime(const std::string &ns) const;
 
   std::string GenerateCursorFromKeyName(const std::string &key_name, 
CursorType cursor_type, const char *prefix = "");
   std::string GetKeyNameFromCursor(const std::string &cursor, CursorType 
cursor_type);
@@ -294,7 +295,7 @@ class Server {
   Stats stats;
   engine::Storage *storage;
   std::unique_ptr<Cluster> cluster;
-  static inline std::atomic<int64_t> unix_time = 0;
+  static inline std::atomic<int64_t> unix_time_secs = 0;
   std::unique_ptr<SlotMigrator> slot_migrator;
   std::unique_ptr<SlotImport> slot_import;
 
@@ -325,7 +326,7 @@ class Server {
 
   std::atomic<bool> stop_ = false;
   std::atomic<bool> is_loading_ = false;
-  int64_t start_time_;
+  int64_t start_time_secs_;
   std::mutex slaveof_mu_;
   std::string master_host_;
   uint32_t master_port_ = 0;
@@ -355,9 +356,9 @@ class Server {
   std::mutex db_job_mu_;
   bool db_compacting_ = false;
   bool is_bgsave_in_progress_ = false;
-  int64_t last_bgsave_time_ = -1;
+  int64_t last_bgsave_timestamp_secs_ = -1;
   std::string last_bgsave_status_ = "ok";
-  int64_t last_bgsave_time_sec_ = -1;
+  int64_t last_bgsave_duration_secs_ = -1;
 
   std::map<std::string, DBScanInfo> db_scan_infos_;
 
diff --git a/src/stats/stats.cc b/src/stats/stats.cc
index 115fc4d9..ae18638b 100644
--- a/src/stats/stats.cc
+++ b/src/stats/stats.cc
@@ -29,7 +29,7 @@
 Stats::Stats() {
   for (int i = 0; i < STATS_METRIC_COUNT; i++) {
     InstMetric im;
-    im.last_sample_time = 0;
+    im.last_sample_time_ms = 0;
     im.last_sample_count = 0;
     im.idx = 0;
     for (uint64_t &sample : im.samples) {
@@ -93,15 +93,15 @@ void Stats::IncrLatency(uint64_t latency, const std::string 
&command_name) {
 }
 
 void Stats::TrackInstantaneousMetric(int metric, uint64_t current_reading) {
-  uint64_t curr_time = util::GetTimeStampMS();
+  uint64_t curr_time_ms = util::GetTimeStampMS();
   std::unique_lock<std::shared_mutex> lock(inst_metrics_mutex);
-  uint64_t t = curr_time - inst_metrics[metric].last_sample_time;
+  uint64_t t = curr_time_ms - inst_metrics[metric].last_sample_time_ms;
   uint64_t ops = current_reading - inst_metrics[metric].last_sample_count;
   uint64_t ops_sec = t > 0 ? (ops * 1000 / t) : 0;
   inst_metrics[metric].samples[inst_metrics[metric].idx] = ops_sec;
   inst_metrics[metric].idx++;
   inst_metrics[metric].idx %= STATS_METRIC_SAMPLES;
-  inst_metrics[metric].last_sample_time = curr_time;
+  inst_metrics[metric].last_sample_time_ms = curr_time_ms;
   inst_metrics[metric].last_sample_count = current_reading;
 }
 
diff --git a/src/stats/stats.h b/src/stats/stats.h
index 88ab2108..6fdba09a 100644
--- a/src/stats/stats.h
+++ b/src/stats/stats.h
@@ -49,8 +49,8 @@ struct CommandStat {
 };
 
 struct InstMetric {
-  uint64_t last_sample_time;   // Timestamp of the last sample in ms
-  uint64_t last_sample_count;  // Count in the last sample
+  uint64_t last_sample_time_ms;  // Timestamp of the last sample in ms
+  uint64_t last_sample_count;    // Count in the last sample
   uint64_t samples[STATS_METRIC_SAMPLES];
   int idx;
 };
diff --git a/src/storage/rdb.cc b/src/storage/rdb.cc
index 79c0b8c8..f513caca 100644
--- a/src/storage/rdb.cc
+++ b/src/storage/rdb.cc
@@ -459,11 +459,11 @@ Status RDB::saveRdbObject(int type, const std::string 
&key, const RedisObjValue
   if (type == RDBTypeString) {
     const auto &value = std::get<std::string>(obj);
     redis::String string_db(storage_, ns_);
-    uint64_t expire = 0;
+    uint64_t expire_ms = 0;
     if (ttl_ms > 0) {
-      expire = ttl_ms + util::GetTimeStampMS();
+      expire_ms = ttl_ms + util::GetTimeStampMS();
     }
-    db_status = string_db.SetEX(key, value, expire);
+    db_status = string_db.SetEX(key, value, expire_ms);
   } else if (type == RDBTypeSet || type == RDBTypeSetIntSet || type == 
RDBTypeSetListPack) {
     const auto &members = std::get<std::vector<std::string>>(obj);
     redis::Set set_db(storage_, ns_);
@@ -567,21 +567,20 @@ Status RDB::LoadRdb(uint32_t db_index, bool 
overwrite_exist_key) {
     return {Status::NotOK, fmt::format("Can't handle RDB format version {}", 
rdb_ver)};
   }
 
-  uint64_t expire_time = 0;
+  uint64_t expire_time_ms = 0;
   int64_t expire_keys = 0;
   int64_t load_keys = 0;
   int64_t empty_keys_skipped = 0;
-  auto now = util::GetTimeStampMS();
+  auto now_ms = util::GetTimeStampMS();
   uint32_t db_id = 0;
   uint64_t skip_exist_keys = 0;
   while (true) {
     auto type = GET_OR_RET(LogWhenError(loadRdbType()));
     if (type == RDBOpcodeExpireTime) {
-      expire_time = 
static_cast<uint64_t>(GET_OR_RET(LogWhenError(loadExpiredTimeSeconds())));
-      expire_time *= 1000;
+      expire_time_ms = 
static_cast<uint64_t>(GET_OR_RET(LogWhenError(loadExpiredTimeSeconds()))) * 
1000;
       continue;
     } else if (type == RDBOpcodeExpireTimeMs) {
-      expire_time = 
GET_OR_RET(LogWhenError(loadExpiredTimeMilliseconds(rdb_ver)));
+      expire_time_ms = 
GET_OR_RET(LogWhenError(loadExpiredTimeMilliseconds(rdb_ver)));
       continue;
     } else if (type == RDBOpcodeFreq) {               // LFU frequency: not 
use in kvrocks
       GET_OR_RET(LogWhenError(stream_->ReadByte()));  // discard the value
@@ -637,8 +636,8 @@ Status RDB::LoadRdb(uint32_t db_index, bool 
overwrite_exist_key) {
         LOG(WARNING) << "skipping empty key: " << key;
       }
       continue;
-    } else if (expire_time != 0 &&
-               expire_time < now) {  // in redis this used to feed this 
deletion to any connected replicas
+    } else if (expire_time_ms != 0 &&
+               expire_time_ms < now_ms) {  // in redis this used to feed this 
deletion to any connected replicas
       expire_keys++;
       continue;
     }
@@ -655,7 +654,7 @@ Status RDB::LoadRdb(uint32_t db_index, bool 
overwrite_exist_key) {
       }
     }
 
-    auto ret = saveRdbObject(type, key, value, expire_time);
+    auto ret = saveRdbObject(type, key, value, expire_time_ms);
     if (!ret.IsOK()) {
       LOG(WARNING) << "save rdb object key " << key << " failed: " << 
ret.Msg();
     } else {
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 1759d2e7..b25ae8a8 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -67,7 +67,7 @@ constexpr double kRocksdbLRUBlockCacheHighPriPoolRatio = 0.75;
 constexpr double kRocksdbLRURowCacheHighPriPoolRatio = 0.5;
 
 // used in creating rocksdb::HyperClockCache, set`estimated_entry_charge` to 0 
means let rocksdb dynamically and
-// automacally adjust the table size for the cache.
+// automatically adjust the table size for the cache.
 constexpr size_t kRockdbHCCAutoAdjustCharge = 0;
 
 const int64_t kIORateLimitMaxMb = 1024000;
@@ -75,7 +75,7 @@ const int64_t kIORateLimitMaxMb = 1024000;
 using rocksdb::Slice;
 
 Storage::Storage(Config *config)
-    : backup_creating_time_(util::GetTimeStamp()),
+    : backup_creating_time_secs_(util::GetTimeStamp<std::chrono::seconds>()),
       env_(rocksdb::Env::Default()),
       config_(config),
       lock_mgr_(16),
@@ -421,8 +421,8 @@ Status Storage::CreateBackup(uint64_t *sequence_number) {
     return {Status::NotOK, s.ToString()};
   }
 
-  // 'backup_mu_' can guarantee 'backup_creating_time_' is thread-safe
-  backup_creating_time_ = static_cast<time_t>(util::GetTimeStamp());
+  // 'backup_mu_' can guarantee 'backup_creating_time_secs_' is thread-safe
+  backup_creating_time_secs_ = util::GetTimeStamp<std::chrono::seconds>();
 
   LOG(INFO) << "[storage] Success to create new backup";
   return Status::OK();
@@ -546,7 +546,7 @@ void Storage::EmptyDB() {
 }
 
 void Storage::PurgeOldBackups(uint32_t num_backups_to_keep, uint32_t 
backup_max_keep_hours) {
-  time_t now = util::GetTimeStamp();
+  auto now_secs = util::GetTimeStamp<std::chrono::seconds>();
   std::lock_guard<std::mutex> lg(config_->backup_mu);
   std::string task_backup_dir = config_->backup_dir;
 
@@ -555,13 +555,14 @@ void Storage::PurgeOldBackups(uint32_t 
num_backups_to_keep, uint32_t backup_max_
   if (!s.ok()) return;
 
   // No backup is needed to keep or the backup is expired, we will clean it.
-  bool backup_expired = (backup_max_keep_hours != 0 && backup_creating_time_ + 
backup_max_keep_hours * 3600 < now);
+  bool backup_expired =
+      (backup_max_keep_hours != 0 && backup_creating_time_secs_ + 
backup_max_keep_hours * 3600 < now_secs);
   if (num_backups_to_keep == 0 || backup_expired) {
     s = rocksdb::DestroyDB(task_backup_dir, rocksdb::Options());
     if (s.ok()) {
-      LOG(INFO) << "[storage] Succeeded cleaning old backup that was created 
at " << backup_creating_time_;
+      LOG(INFO) << "[storage] Succeeded cleaning old backup that was created 
at " << backup_creating_time_secs_;
     } else {
-      LOG(INFO) << "[storage] Failed cleaning old backup that was created at " 
<< backup_creating_time_
+      LOG(INFO) << "[storage] Failed cleaning old backup that was created at " 
<< backup_creating_time_secs_
                 << ". Error: " << s.ToString();
     }
   }
@@ -975,9 +976,9 @@ Status 
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
     uint64_t checkpoint_latest_seq = 0;
     s = checkpoint->CreateCheckpoint(data_files_dir, 
storage->config_->rocks_db.write_buffer_size * MiB,
                                      &checkpoint_latest_seq);
-    auto now = static_cast<time_t>(util::GetTimeStamp());
-    storage->checkpoint_info_.create_time = now;
-    storage->checkpoint_info_.access_time = now;
+    auto now_secs = util::GetTimeStamp<std::chrono::seconds>();
+    storage->checkpoint_info_.create_time_secs = now_secs;
+    storage->checkpoint_info_.access_time_secs = now_secs;
     storage->checkpoint_info_.latest_seq = checkpoint_latest_seq;
     if (!s.ok()) {
       LOG(WARNING) << "[storage] Failed to create checkpoint (snapshot). 
Error: " << s.ToString();
@@ -987,12 +988,12 @@ Status 
Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
     LOG(INFO) << "[storage] Create checkpoint successfully";
   } else {
     // Replicas can share checkpoint to replication if the checkpoint existing 
time is less than a half of WAL TTL.
-    int64_t can_shared_time = storage->config_->rocks_db.wal_ttl_seconds / 2;
-    if (can_shared_time > 60 * 60) can_shared_time = 60 * 60;
-    if (can_shared_time < 10 * 60) can_shared_time = 10 * 60;
+    int64_t can_shared_time_secs = storage->config_->rocks_db.wal_ttl_seconds 
/ 2;
+    if (can_shared_time_secs > 60 * 60) can_shared_time_secs = 60 * 60;
+    if (can_shared_time_secs < 10 * 60) can_shared_time_secs = 10 * 60;
 
-    auto now = static_cast<time_t>(util::GetTimeStamp());
-    if (now - storage->GetCheckpointCreateTime() > can_shared_time) {
+    auto now_secs = util::GetTimeStamp<std::chrono::seconds>();
+    if (now_secs - storage->GetCheckpointCreateTimeSecs() > 
can_shared_time_secs) {
       LOG(WARNING) << "[storage] Can't use current checkpoint, waiting next 
checkpoint";
       return {Status::NotOK, "Can't use current checkpoint, waiting for next 
checkpoint"};
     }
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 208499b5..09151baa 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -215,8 +215,10 @@ class Storage {
     static int OpenDataFile(Storage *storage, const std::string &rel_file, 
uint64_t *file_size);
     static Status CleanInvalidFiles(Storage *storage, const std::string &dir, 
std::vector<std::string> valid_files);
     struct CheckpointInfo {
-      std::atomic<time_t> create_time = 0;
-      std::atomic<time_t> access_time = 0;
+      // System clock time when the checkpoint was created.
+      std::atomic<int64_t> create_time_secs = 0;
+      // System clock time when the checkpoint was last accessed.
+      std::atomic<int64_t> access_time_secs = 0;
       uint64_t latest_seq = 0;
     };
 
@@ -238,9 +240,9 @@ class Storage {
 
   bool ExistCheckpoint();
   bool ExistSyncCheckpoint();
-  time_t GetCheckpointCreateTime() const { return 
checkpoint_info_.create_time; }
-  void SetCheckpointAccessTime(time_t t) { checkpoint_info_.access_time = t; }
-  time_t GetCheckpointAccessTime() const { return 
checkpoint_info_.access_time; }
+  int64_t GetCheckpointCreateTimeSecs() const { return 
checkpoint_info_.create_time_secs; }
+  void SetCheckpointAccessTimeSecs(int64_t t) { 
checkpoint_info_.access_time_secs = t; }
+  int64_t GetCheckpointAccessTimeSecs() const { return 
checkpoint_info_.access_time_secs; }
   void SetDBInRetryableIOError(bool yes_or_no) { db_in_retryable_io_error_ = 
yes_or_no; }
   bool IsDBInRetryableIOError() const { return db_in_retryable_io_error_; }
 
@@ -251,7 +253,8 @@ class Storage {
  private:
   std::unique_ptr<rocksdb::DB> db_ = nullptr;
   std::string replid_;
-  time_t backup_creating_time_;
+  // The system clock time when the backup was created.
+  int64_t backup_creating_time_secs_;
   std::unique_ptr<rocksdb::BackupEngine> backup_ = nullptr;
   rocksdb::Env *env_;
   std::shared_ptr<rocksdb::SstFileManager> sst_file_manager_;
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 1b9444ed..6c66f800 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -237,8 +237,8 @@ std::string 
Stream::consumerNameFromInternalKey(rocksdb::Slice key) const {
 std::string Stream::encodeStreamConsumerMetadataValue(const 
StreamConsumerMetadata &consumer_metadata) {
   std::string dst;
   PutFixed64(&dst, consumer_metadata.pending_number);
-  PutFixed64(&dst, consumer_metadata.last_idle);
-  PutFixed64(&dst, consumer_metadata.last_active);
+  PutFixed64(&dst, consumer_metadata.last_idle_ms);
+  PutFixed64(&dst, consumer_metadata.last_active_ms);
   return dst;
 }
 
@@ -246,8 +246,8 @@ StreamConsumerMetadata 
Stream::decodeStreamConsumerMetadataValue(const std::stri
   StreamConsumerMetadata consumer_metadata;
   rocksdb::Slice input(value);
   GetFixed64(&input, &consumer_metadata.pending_number);
-  GetFixed64(&input, &consumer_metadata.last_idle);
-  GetFixed64(&input, &consumer_metadata.last_active);
+  GetFixed64(&input, &consumer_metadata.last_idle_ms);
+  GetFixed64(&input, &consumer_metadata.last_active_ms);
   return consumer_metadata;
 }
 
@@ -277,7 +277,7 @@ StreamEntryID 
Stream::groupAndEntryIdFromPelInternalKey(rocksdb::Slice key, std:
 
 std::string Stream::encodeStreamPelEntryValue(const StreamPelEntry &pel_entry) 
{
   std::string dst;
-  PutFixed64(&dst, pel_entry.last_delivery_time);
+  PutFixed64(&dst, pel_entry.last_delivery_time_ms);
   PutFixed64(&dst, pel_entry.last_delivery_count);
   PutFixed64(&dst, pel_entry.consumer_name.size());
   dst += pel_entry.consumer_name;
@@ -287,7 +287,7 @@ std::string Stream::encodeStreamPelEntryValue(const 
StreamPelEntry &pel_entry) {
 StreamPelEntry Stream::decodeStreamPelEntryValue(const std::string &value) {
   StreamPelEntry pel_entry;
   rocksdb::Slice input(value);
-  GetFixed64(&input, &pel_entry.last_delivery_time);
+  GetFixed64(&input, &pel_entry.last_delivery_time_ms);
   GetFixed64(&input, &pel_entry.last_delivery_count);
   uint64_t consumer_name_len = 0;
   GetFixed64(&input, &consumer_name_len);
@@ -487,8 +487,8 @@ rocksdb::Status Stream::createConsumerWithoutLock(const 
Slice &stream_name, cons
 
   StreamConsumerMetadata consumer_metadata;
   auto now = util::GetTimeStampMS();
-  consumer_metadata.last_idle = now;
-  consumer_metadata.last_active = now;
+  consumer_metadata.last_idle_ms = now;
+  consumer_metadata.last_active_ms = now;
   std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
   std::string consumer_value = 
encodeStreamConsumerMetadataValue(consumer_metadata);
   std::string get_consumer_value;
@@ -1153,9 +1153,9 @@ rocksdb::Status Stream::RangeWithPending(const Slice 
&stream_name, StreamRangeOp
     return s;
   }
   StreamConsumerMetadata consumer_metadata = 
decodeStreamConsumerMetadataValue(get_consumer_value);
-  auto now = util::GetTimeStampMS();
-  consumer_metadata.last_idle = now;
-  consumer_metadata.last_active = now;
+  auto now_ms = util::GetTimeStampMS();
+  consumer_metadata.last_idle_ms = now_ms;
+  consumer_metadata.last_active_ms = now_ms;
 
   if (latest) {
     options.start = consumergroup_metadata.last_delivered_id;
@@ -1219,7 +1219,7 @@ rocksdb::Status Stream::RangeWithPending(const Slice 
&stream_name, StreamRangeOp
         }
         entries->emplace_back(entry_id.ToString(), std::move(values));
         pel_entry.last_delivery_count += 1;
-        pel_entry.last_delivery_time = now;
+        pel_entry.last_delivery_time_ms = now_ms;
         batch->Put(stream_cf_handle_, iter->key(), 
encodeStreamPelEntryValue(pel_entry));
         ++count;
         if (count >= options.count) break;
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 60d54d23..ae90a3e5 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -171,8 +171,8 @@ struct StreamConsumerGroupMetadata {
 
 struct StreamConsumerMetadata {
   uint64_t pending_number = 0;
-  uint64_t last_idle;
-  uint64_t last_active;
+  uint64_t last_idle_ms;
+  uint64_t last_active_ms;
 };
 
 enum class StreamSubkeyType {
@@ -183,7 +183,7 @@ enum class StreamSubkeyType {
 };
 
 struct StreamPelEntry {
-  uint64_t last_delivery_time;
+  uint64_t last_delivery_time_ms;
   uint64_t last_delivery_count;
   std::string consumer_name;
 };
diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc
index e0f0c99d..0f0d4688 100644
--- a/src/types/redis_string.cc
+++ b/src/types/redis_string.cc
@@ -255,21 +255,21 @@ rocksdb::Status String::Set(const std::string &user_key, 
const std::string &valu
   return updateRawValue(ns_key, new_raw_value);
 }
 
-rocksdb::Status String::SetEX(const std::string &user_key, const std::string 
&value, uint64_t expire) {
+rocksdb::Status String::SetEX(const std::string &user_key, const std::string 
&value, uint64_t expire_ms) {
   std::optional<std::string> ret;
-  return Set(user_key, value, {expire, StringSetType::NONE, /*get=*/false, 
/*keep_ttl=*/false}, ret);
+  return Set(user_key, value, {expire_ms, StringSetType::NONE, /*get=*/false, 
/*keep_ttl=*/false}, ret);
 }
 
-rocksdb::Status String::SetNX(const std::string &user_key, const std::string 
&value, uint64_t expire, bool *flag) {
+rocksdb::Status String::SetNX(const std::string &user_key, const std::string 
&value, uint64_t expire_ms, bool *flag) {
   std::optional<std::string> ret;
-  auto s = Set(user_key, value, {expire, StringSetType::NX, /*get=*/false, 
/*keep_ttl=*/false}, ret);
+  auto s = Set(user_key, value, {expire_ms, StringSetType::NX, /*get=*/false, 
/*keep_ttl=*/false}, ret);
   *flag = ret.has_value();
   return s;
 }
 
-rocksdb::Status String::SetXX(const std::string &user_key, const std::string 
&value, uint64_t expire, bool *flag) {
+rocksdb::Status String::SetXX(const std::string &user_key, const std::string 
&value, uint64_t expire_ms, bool *flag) {
   std::optional<std::string> ret;
-  auto s = Set(user_key, value, {expire, StringSetType::XX, /*get=*/false, 
/*keep_ttl=*/false}, ret);
+  auto s = Set(user_key, value, {expire_ms, StringSetType::XX, /*get=*/false, 
/*keep_ttl=*/false}, ret);
   *flag = ret.has_value();
   return s;
 }
@@ -384,7 +384,7 @@ rocksdb::Status String::IncrByFloat(const std::string 
&user_key, double incremen
   return updateRawValue(ns_key, raw_value);
 }
 
-rocksdb::Status String::MSet(const std::vector<StringPair> &pairs, uint64_t 
expire, bool lock) {
+rocksdb::Status String::MSet(const std::vector<StringPair> &pairs, uint64_t 
expire_ms, bool lock) {
   // Data race, key string maybe overwrite by other key while didn't lock the 
keys here,
   // to improve the set performance
   std::optional<MultiLockGuard> guard;
@@ -404,7 +404,7 @@ rocksdb::Status String::MSet(const std::vector<StringPair> 
&pairs, uint64_t expi
   for (const auto &pair : pairs) {
     std::string bytes;
     Metadata metadata(kRedisString, false);
-    metadata.expire = expire;
+    metadata.expire = expire_ms;
     metadata.Encode(&bytes);
     bytes.append(pair.value.data(), pair.value.size());
     std::string ns_key = AppendNamespacePrefix(pair.key);
@@ -413,7 +413,7 @@ rocksdb::Status String::MSet(const std::vector<StringPair> 
&pairs, uint64_t expi
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
-rocksdb::Status String::MSetNX(const std::vector<StringPair> &pairs, uint64_t 
expire, bool *flag) {
+rocksdb::Status String::MSetNX(const std::vector<StringPair> &pairs, uint64_t 
expire_ms, bool *flag) {
   *flag = false;
 
   int exists = 0;
@@ -435,7 +435,7 @@ rocksdb::Status String::MSetNX(const 
std::vector<StringPair> &pairs, uint64_t ex
     return rocksdb::Status::OK();
   }
 
-  rocksdb::Status s = MSet(pairs, /*expire=*/expire, /*lock=*/false);
+  rocksdb::Status s = MSet(pairs, /*expire_ms=*/expire_ms, /*lock=*/false);
   if (!s.ok()) return s;
 
   *flag = true;
diff --git a/src/types/redis_string.h b/src/types/redis_string.h
index faf30259..34afb0bd 100644
--- a/src/types/redis_string.h
+++ b/src/types/redis_string.h
@@ -37,6 +37,7 @@ struct StringPair {
 enum class StringSetType { NONE, NX, XX };
 
 struct StringSetArgs {
+  // Expire time in mill seconds.
   uint64_t expire;
   StringSetType type;
   bool get;
@@ -85,24 +86,24 @@ class String : public Database {
   rocksdb::Status Set(const std::string &user_key, const std::string &value);
   rocksdb::Status Set(const std::string &user_key, const std::string &value, 
StringSetArgs args,
                       std::optional<std::string> &ret);
-  rocksdb::Status SetEX(const std::string &user_key, const std::string &value, 
uint64_t expire);
-  rocksdb::Status SetNX(const std::string &user_key, const std::string &value, 
uint64_t expire, bool *flag);
-  rocksdb::Status SetXX(const std::string &user_key, const std::string &value, 
uint64_t expire, bool *flag);
+  rocksdb::Status SetEX(const std::string &user_key, const std::string &value, 
uint64_t expire_ms);
+  rocksdb::Status SetNX(const std::string &user_key, const std::string &value, 
uint64_t expire_ms, bool *flag);
+  rocksdb::Status SetXX(const std::string &user_key, const std::string &value, 
uint64_t expire_ms, bool *flag);
   rocksdb::Status SetRange(const std::string &user_key, size_t offset, const 
std::string &value, uint64_t *new_size);
   rocksdb::Status IncrBy(const std::string &user_key, int64_t increment, 
int64_t *new_value);
   rocksdb::Status IncrByFloat(const std::string &user_key, double increment, 
double *new_value);
   std::vector<rocksdb::Status> MGet(const std::vector<Slice> &keys, 
std::vector<std::string> *values);
-  rocksdb::Status MSet(const std::vector<StringPair> &pairs, uint64_t expire, 
bool lock = true);
-  rocksdb::Status MSetNX(const std::vector<StringPair> &pairs, uint64_t 
expire, bool *flag);
+  rocksdb::Status MSet(const std::vector<StringPair> &pairs, uint64_t 
expire_ms, bool lock = true);
+  rocksdb::Status MSetNX(const std::vector<StringPair> &pairs, uint64_t 
expire_ms, bool *flag);
   rocksdb::Status CAS(const std::string &user_key, const std::string 
&old_value, const std::string &new_value,
-                      uint64_t expire, int *flag);
+                      uint64_t expire_ms, int *flag);
   rocksdb::Status CAD(const std::string &user_key, const std::string &value, 
int *flag);
   rocksdb::Status LCS(const std::string &user_key1, const std::string 
&user_key2, StringLCSArgs args,
                       StringLCSResult *rst);
 
  private:
   rocksdb::Status getValue(const std::string &ns_key, std::string *value);
-  rocksdb::Status getValueAndExpire(const std::string &ns_key, std::string 
*value, uint64_t *expire);
+  rocksdb::Status getValueAndExpire(const std::string &ns_key, std::string 
*value, uint64_t *expire_ms);
   std::vector<rocksdb::Status> getValues(const std::vector<Slice> &ns_keys, 
std::vector<std::string> *values);
   rocksdb::Status getRawValue(const std::string &ns_key, std::string 
*raw_value);
   std::vector<rocksdb::Status> getRawValues(const std::vector<Slice> &keys, 
std::vector<std::string> *raw_values);


Reply via email to