This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new ad5dde96 feat: support histograms for command latency statistics 
(#2721)
ad5dde96 is described below

commit ad5dde963da3b9a0e48e654dda6fb689f816bb89
Author: Rabun Kosar <[email protected]>
AuthorDate: Tue Jan 21 18:41:07 2025 -0800

    feat: support histograms for command latency statistics (#2721)
    
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: hulk <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 kvrocks.conf                        | 26 +++++++++++++++++++-------
 src/config/config.cc                | 20 ++++++++++++++++++++
 src/config/config.h                 |  3 +++
 src/server/server.cc                | 32 +++++++++++++++++++++++++++++++-
 src/stats/stats.cc                  | 14 +++++++++++++-
 src/stats/stats.h                   | 16 +++++++++++++++-
 tests/cppunit/config_test.cc        |  2 ++
 tests/gocase/unit/info/info_test.go | 23 ++++++++++++++++++++---
 8 files changed, 123 insertions(+), 13 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index 16a980e5..461d077e 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -20,13 +20,13 @@ bind 127.0.0.1
 # unixsocket /tmp/kvrocks.sock
 # unixsocketperm 777
 
-# Allows a parent process to open a socket and pass its FD down to kvrocks as 
a child 
+# Allows a parent process to open a socket and pass its FD down to kvrocks as 
a child
 # process. Useful to reserve a port and prevent race conditions.
-# 
-# PLEASE NOTE: 
-# If this is overridden to a value other than -1, the bind and tls* directives 
will be 
+#
+# PLEASE NOTE:
+# If this is overridden to a value other than -1, the bind and tls* directives 
will be
 # ignored.
-# 
+#
 # Default: -1 (not overridden, defer to creating a connection to the specified 
port)
 socket-fd -1
 
@@ -369,10 +369,22 @@ json-storage-format json
 # NOTE: This is an experimental feature. If you find errors, performance 
degradation,
 # excessive memory usage, excessive disk I/O, etc. after enabling it, please 
try disabling it.
 # At the same time, we welcome feedback on related issues to help iterative 
improvements.
-# 
+#
 # Default: no
 txn-context-enabled no
 
+# Define the histogram bucket values.
+#
+# If enabled, those values will be used to store the command execution latency 
values
+# in buckets defined below. The values should be integers and must be sorted.
+# An implicit bucket (+Inf in prometheus jargon) will be added to track the 
highest values
+# that are beyond the bucket limits.
+
+# NOTE: This is an experimental feature. There might be some performance 
overhead when using this
+# feature, please be aware.
+# Default: disabled
+# histogram-bucket-boundaries  
10,20,40,60,80,100,150,250,350,500,750,1000,1500,2000,4000,8000
+
 ################################## TLS ###################################
 
 # By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
@@ -1031,7 +1043,7 @@ rocksdb.partition_filters yes
 # Specifies the maximum size in bytes for a write batch in RocksDB.
 # If set to 0, there is no size limit for write batches.
 # This option can help control memory usage and manage large WriteBatch 
operations more effectively.
-# 
+#
 # Default: 0
 # rocksdb.write_options.write_batch_max_bytes 0
 
diff --git a/src/config/config.cc b/src/config/config.cc
index fe7e3fb0..284f47e9 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -239,6 +239,7 @@ Config::Config() {
        new EnumField<JsonStorageFormat>(&json_storage_format, 
json_storage_formats, JsonStorageFormat::JSON)},
       {"txn-context-enabled", true, new YesNoField(&txn_context_enabled, 
false)},
       {"skip-block-cache-deallocation-on-close", false, new 
YesNoField(&skip_block_cache_deallocation_on_close, false)},
+      {"histogram-bucket-boundaries", true, new 
StringField(&histogram_bucket_boundaries_str_, "")},
 
       /* rocksdb options */
       {"rocksdb.compression", false,
@@ -754,6 +755,25 @@ void Config::initFieldCallback() {
       {"tls-session-cache-size", set_tls_option},
       {"tls-session-cache-timeout", set_tls_option},
 #endif
+      {"histogram-bucket-boundaries",
+       [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string 
&k, const std::string &v) -> Status {
+         std::vector<std::string> buckets = util::Split(v, ",");
+         histogram_bucket_boundaries.clear();
+         if (buckets.size() < 1) {
+           return Status::OK();
+         }
+         for (const auto &bucket_val : buckets) {
+           auto parse_result = ParseFloat<double>(bucket_val);
+           if (!parse_result) {
+             return {Status::NotOK, "The values in the bucket list must be 
double or integer."};
+           }
+           histogram_bucket_boundaries.push_back(*parse_result);
+         }
+         if (!std::is_sorted(histogram_bucket_boundaries.begin(), 
histogram_bucket_boundaries.end())) {
+           return {Status::NotOK, "The values for the histogram must be 
sorted."};
+         }
+         return Status::OK();
+       }},
   };
   for (const auto &iter : callbacks) {
     auto field_iter = fields_.find(iter.first);
diff --git a/src/config/config.h b/src/config/config.h
index 9fa5d416..7c76759c 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -177,6 +177,8 @@ struct Config {
 
   bool skip_block_cache_deallocation_on_close = false;
 
+  std::vector<double> histogram_bucket_boundaries;
+
   struct RocksDB {
     int block_size;
     bool cache_index_and_filter_blocks;
@@ -260,6 +262,7 @@ struct Config {
   std::string profiling_sample_commands_str_;
   std::map<std::string, std::unique_ptr<ConfigField>> fields_;
   std::vector<std::string> rename_command_;
+  std::string histogram_bucket_boundaries_str_;
 
   void initFieldValidator();
   void initFieldCallback();
diff --git a/src/server/server.cc b/src/server/server.cc
index 5b52eb33..e809444b 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -52,7 +52,8 @@
 #include "worker.h"
 
 Server::Server(engine::Storage *storage, Config *config)
-    : storage(storage),
+    : stats(config->histogram_bucket_boundaries),
+      storage(storage),
       indexer(storage),
       index_mgr(&indexer, storage),
       start_time_secs_(util::GetTimeStamp()),
@@ -60,9 +61,19 @@ Server::Server(engine::Storage *storage, Config *config)
       namespace_(storage) {
   // init commands stats here to prevent concurrent insert, and cause core
   auto commands = redis::CommandTable::GetOriginal();
+
   for (const auto &iter : *commands) {
     stats.commands_stats[iter.first].calls = 0;
     stats.commands_stats[iter.first].latency = 0;
+
+    if (stats.bucket_boundaries.size() > 0) {
+      // NB: Extra index for the last bucket (Inf)
+      for (std::size_t i{0}; i <= stats.bucket_boundaries.size(); ++i) {
+        
stats.commands_histogram[iter.first].buckets.push_back(std::make_unique<std::atomic<uint64_t>>(0));
+      }
+      stats.commands_histogram[iter.first].calls = 0;
+      stats.commands_histogram[iter.first].sum = 0;
+    }
   }
 
   // init cursor_dict_
@@ -1165,6 +1176,25 @@ void Server::GetCommandsStatsInfo(std::string *info) {
                   << ",usec_per_call=" << static_cast<float>(latency / calls) 
<< "\r\n";
   }
 
+  for (const auto &cmd_hist : stats.commands_histogram) {
+    auto command_name = cmd_hist.first;
+    auto calls = stats.commands_histogram[command_name].calls.load();
+    if (calls == 0) continue;
+
+    auto sum = stats.commands_histogram[command_name].sum.load();
+    string_stream << "cmdstathist_" << command_name << ":";
+    for (std::size_t i{0}; i < 
stats.commands_histogram[command_name].buckets.size(); ++i) {
+      auto bucket_value = 
stats.commands_histogram[command_name].buckets[i]->load();
+      auto bucket_bound = std::numeric_limits<double>::infinity();
+      if (i < stats.bucket_boundaries.size()) {
+        bucket_bound = stats.bucket_boundaries[i];
+      }
+
+      string_stream << bucket_bound << "=" << bucket_value << ",";
+    }
+    string_stream << "sum=" << sum << ",count=" << calls << "\r\n";
+  }
+
   *info = string_stream.str();
 }
 
diff --git a/src/stats/stats.cc b/src/stats/stats.cc
index ae18638b..5baea34d 100644
--- a/src/stats/stats.cc
+++ b/src/stats/stats.cc
@@ -26,7 +26,7 @@
 #include "fmt/format.h"
 #include "time_util.h"
 
-Stats::Stats() {
+Stats::Stats(std::vector<double> bucket_boundaries) : 
bucket_boundaries(std::move(bucket_boundaries)) {
   for (int i = 0; i < STATS_METRIC_COUNT; i++) {
     InstMetric im;
     im.last_sample_time_ms = 0;
@@ -86,10 +86,22 @@ int64_t Stats::GetMemoryRSS() {
 void Stats::IncrCalls(const std::string &command_name) {
   total_calls.fetch_add(1, std::memory_order_relaxed);
   commands_stats[command_name].calls.fetch_add(1, std::memory_order_relaxed);
+
+  if (bucket_boundaries.size() > 0) {
+    commands_histogram[command_name].calls.fetch_add(1, 
std::memory_order_relaxed);
+  }
 }
 
 void Stats::IncrLatency(uint64_t latency, const std::string &command_name) {
   commands_stats[command_name].latency.fetch_add(latency, 
std::memory_order_relaxed);
+
+  if (bucket_boundaries.size() > 0) {
+    commands_histogram[command_name].sum.fetch_add(latency, 
std::memory_order_relaxed);
+
+    const auto bucket_index = static_cast<std::size_t>(std::distance(
+        bucket_boundaries.begin(), std::lower_bound(bucket_boundaries.begin(), 
bucket_boundaries.end(), latency)));
+    commands_histogram[command_name].buckets[bucket_index]->fetch_add(1, 
std::memory_order_relaxed);
+  }
 }
 
 void Stats::TrackInstantaneousMetric(int metric, uint64_t current_reading) {
diff --git a/src/stats/stats.h b/src/stats/stats.h
index e00506a9..0bae042d 100644
--- a/src/stats/stats.h
+++ b/src/stats/stats.h
@@ -22,8 +22,10 @@
 
 #include <unistd.h>
 
+#include <algorithm>
 #include <atomic>
 #include <map>
+#include <memory>
 #include <shared_mutex>
 #include <string>
 #include <vector>
@@ -43,6 +45,13 @@ enum StatsMetricFlags {
 
 constexpr int STATS_METRIC_SAMPLES = 16;  // Number of samples per metric
 
+// Experimental part to support histograms for cmd statistics
+struct CommandHistogram {
+  std::vector<std::unique_ptr<std::atomic<uint64_t>>> buckets;
+  std::atomic<uint64_t> calls;
+  std::atomic<uint64_t> sum;
+};
+
 struct CommandStat {
   std::atomic<uint64_t> calls;
   std::atomic<uint64_t> latency;
@@ -69,7 +78,12 @@ class Stats {
   std::atomic<uint64_t> psync_ok_count = {0};
   std::map<std::string, CommandStat> commands_stats;
 
-  Stats();
+  using BucketBoundaries = std::vector<double>;
+  BucketBoundaries bucket_boundaries;
+  std::map<std::string, CommandHistogram> commands_histogram;
+
+  explicit Stats(std::vector<double> histogram_bucket_boundaries);
+
   void IncrCalls(const std::string &command_name);
   void IncrLatency(uint64_t latency, const std::string &command_name);
   void IncrInboundBytes(uint64_t bytes) { in_bytes.fetch_add(bytes, 
std::memory_order_relaxed); }
diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc
index 4e9b3817..2a6ac1b5 100644
--- a/tests/cppunit/config_test.cc
+++ b/tests/cppunit/config_test.cc
@@ -130,6 +130,8 @@ TEST(Config, GetAndSet) {
       {"rocksdb.rate_limiter_auto_tuned", "yes"},
       {"rocksdb.compression_level", "32767"},
       {"rocksdb.wal_compression", "no"},
+      {"histogram-bucket-boundaries", "10,100,1000,10000"},
+
   };
   for (const auto &iter : immutable_cases) {
     s = config.Set(nullptr, iter.first, iter.second);
diff --git a/tests/gocase/unit/info/info_test.go 
b/tests/gocase/unit/info/info_test.go
index dd4b0cdc..128e5774 100644
--- a/tests/gocase/unit/info/info_test.go
+++ b/tests/gocase/unit/info/info_test.go
@@ -23,17 +23,20 @@ import (
        "context"
        "fmt"
        "strconv"
+       "strings"
        "testing"
        "time"
 
-       "github.com/redis/go-redis/v9"
-
        "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/redis/go-redis/v9"
        "github.com/stretchr/testify/require"
 )
 
 func TestInfo(t *testing.T) {
-       srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+       srv0 := util.StartServer(t, map[string]string{
+               "cluster-enabled":             "yes",
+               "histogram-bucket-boundaries": "10,20,30,50",
+       })
        defer func() { srv0.Close() }()
        rdb0 := srv0.NewClient()
        defer func() { require.NoError(t, rdb0.Close()) }()
@@ -102,6 +105,20 @@ func TestInfo(t *testing.T) {
        t.Run("get cluster information by INFO - cluster enabled", func(t 
*testing.T) {
                require.Equal(t, "1", util.FindInfoEntry(rdb0, 
"cluster_enabled", "cluster"))
        })
+
+       t.Run("get command latencies via histogram INFO - 
histogram-bucket-boundaries", func(t *testing.T) {
+               output := util.FindInfoEntry(rdb0, "cmdstathist_info", 
"commandstats")
+
+               splitValues := strings.FieldsFunc(output, func(r rune) bool {
+                       return r == '=' || r == ','
+               })
+
+               // expected: 10=..,20=..,30=..,50=..,inf=..,sum=...,count=..
+               require.GreaterOrEqual(t, len(splitValues), 14)
+               require.Contains(t, splitValues, "sum")
+               require.Contains(t, splitValues, "count")
+               require.Contains(t, splitValues, "inf")
+       })
 }
 
 func TestKeyspaceHitMiss(t *testing.T) {

Reply via email to