This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new ad5dde96 feat: support histograms for command latency statistics
(#2721)
ad5dde96 is described below
commit ad5dde963da3b9a0e48e654dda6fb689f816bb89
Author: Rabun Kosar <[email protected]>
AuthorDate: Tue Jan 21 18:41:07 2025 -0800
feat: support histograms for command latency statistics (#2721)
Co-authored-by: Twice <[email protected]>
Co-authored-by: hulk <[email protected]>
Co-authored-by: Twice <[email protected]>
---
kvrocks.conf | 26 +++++++++++++++++++-------
src/config/config.cc | 20 ++++++++++++++++++++
src/config/config.h | 3 +++
src/server/server.cc | 32 +++++++++++++++++++++++++++++++-
src/stats/stats.cc | 14 +++++++++++++-
src/stats/stats.h | 16 +++++++++++++++-
tests/cppunit/config_test.cc | 2 ++
tests/gocase/unit/info/info_test.go | 23 ++++++++++++++++++++---
8 files changed, 123 insertions(+), 13 deletions(-)
diff --git a/kvrocks.conf b/kvrocks.conf
index 16a980e5..461d077e 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -20,13 +20,13 @@ bind 127.0.0.1
# unixsocket /tmp/kvrocks.sock
# unixsocketperm 777
-# Allows a parent process to open a socket and pass its FD down to kvrocks as
a child
+# Allows a parent process to open a socket and pass its FD down to kvrocks as
a child
# process. Useful to reserve a port and prevent race conditions.
-#
-# PLEASE NOTE:
-# If this is overridden to a value other than -1, the bind and tls* directives
will be
+#
+# PLEASE NOTE:
+# If this is overridden to a value other than -1, the bind and tls* directives
will be
# ignored.
-#
+#
# Default: -1 (not overridden, defer to creating a connection to the specified
port)
socket-fd -1
@@ -369,10 +369,22 @@ json-storage-format json
# NOTE: This is an experimental feature. If you find errors, performance
degradation,
# excessive memory usage, excessive disk I/O, etc. after enabling it, please
try disabling it.
# At the same time, we welcome feedback on related issues to help iterative
improvements.
-#
+#
# Default: no
txn-context-enabled no
+# Define the histogram bucket values.
+#
+# If enabled, those values will be used to store the command execution latency
values
+# in buckets defined below. The values should be integers and must be sorted.
+# An implicit bucket (+Inf in prometheus jargon) will be added to track the
highest values
+# that are beyond the bucket limits.
+
+# NOTE: This is an experimental feature. There might be some performance
overhead when using this
+# feature, please be aware.
+# Default: disabled
+# histogram-bucket-boundaries
10,20,40,60,80,100,150,250,350,500,750,1000,1500,2000,4000,8000
+
################################## TLS ###################################
# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
@@ -1031,7 +1043,7 @@ rocksdb.partition_filters yes
# Specifies the maximum size in bytes for a write batch in RocksDB.
# If set to 0, there is no size limit for write batches.
# This option can help control memory usage and manage large WriteBatch
operations more effectively.
-#
+#
# Default: 0
# rocksdb.write_options.write_batch_max_bytes 0
diff --git a/src/config/config.cc b/src/config/config.cc
index fe7e3fb0..284f47e9 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -239,6 +239,7 @@ Config::Config() {
new EnumField<JsonStorageFormat>(&json_storage_format,
json_storage_formats, JsonStorageFormat::JSON)},
{"txn-context-enabled", true, new YesNoField(&txn_context_enabled,
false)},
{"skip-block-cache-deallocation-on-close", false, new
YesNoField(&skip_block_cache_deallocation_on_close, false)},
+ {"histogram-bucket-boundaries", true, new
StringField(&histogram_bucket_boundaries_str_, "")},
/* rocksdb options */
{"rocksdb.compression", false,
@@ -754,6 +755,25 @@ void Config::initFieldCallback() {
{"tls-session-cache-size", set_tls_option},
{"tls-session-cache-timeout", set_tls_option},
#endif
+ {"histogram-bucket-boundaries",
+ [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string
&k, const std::string &v) -> Status {
+ std::vector<std::string> buckets = util::Split(v, ",");
+ histogram_bucket_boundaries.clear();
+ if (buckets.size() < 1) {
+ return Status::OK();
+ }
+ for (const auto &bucket_val : buckets) {
+ auto parse_result = ParseFloat<double>(bucket_val);
+ if (!parse_result) {
+ return {Status::NotOK, "The values in the bucket list must be
double or integer."};
+ }
+ histogram_bucket_boundaries.push_back(*parse_result);
+ }
+ if (!std::is_sorted(histogram_bucket_boundaries.begin(),
histogram_bucket_boundaries.end())) {
+ return {Status::NotOK, "The values for the histogram must be
sorted."};
+ }
+ return Status::OK();
+ }},
};
for (const auto &iter : callbacks) {
auto field_iter = fields_.find(iter.first);
diff --git a/src/config/config.h b/src/config/config.h
index 9fa5d416..7c76759c 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -177,6 +177,8 @@ struct Config {
bool skip_block_cache_deallocation_on_close = false;
+ std::vector<double> histogram_bucket_boundaries;
+
struct RocksDB {
int block_size;
bool cache_index_and_filter_blocks;
@@ -260,6 +262,7 @@ struct Config {
std::string profiling_sample_commands_str_;
std::map<std::string, std::unique_ptr<ConfigField>> fields_;
std::vector<std::string> rename_command_;
+ std::string histogram_bucket_boundaries_str_;
void initFieldValidator();
void initFieldCallback();
diff --git a/src/server/server.cc b/src/server/server.cc
index 5b52eb33..e809444b 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -52,7 +52,8 @@
#include "worker.h"
Server::Server(engine::Storage *storage, Config *config)
- : storage(storage),
+ : stats(config->histogram_bucket_boundaries),
+ storage(storage),
indexer(storage),
index_mgr(&indexer, storage),
start_time_secs_(util::GetTimeStamp()),
@@ -60,9 +61,19 @@ Server::Server(engine::Storage *storage, Config *config)
namespace_(storage) {
// init commands stats here to prevent concurrent insert, and cause core
auto commands = redis::CommandTable::GetOriginal();
+
for (const auto &iter : *commands) {
stats.commands_stats[iter.first].calls = 0;
stats.commands_stats[iter.first].latency = 0;
+
+ if (stats.bucket_boundaries.size() > 0) {
+ // NB: Extra index for the last bucket (Inf)
+ for (std::size_t i{0}; i <= stats.bucket_boundaries.size(); ++i) {
+
stats.commands_histogram[iter.first].buckets.push_back(std::make_unique<std::atomic<uint64_t>>(0));
+ }
+ stats.commands_histogram[iter.first].calls = 0;
+ stats.commands_histogram[iter.first].sum = 0;
+ }
}
// init cursor_dict_
@@ -1165,6 +1176,25 @@ void Server::GetCommandsStatsInfo(std::string *info) {
<< ",usec_per_call=" << static_cast<float>(latency / calls)
<< "\r\n";
}
+ for (const auto &cmd_hist : stats.commands_histogram) {
+ auto command_name = cmd_hist.first;
+ auto calls = stats.commands_histogram[command_name].calls.load();
+ if (calls == 0) continue;
+
+ auto sum = stats.commands_histogram[command_name].sum.load();
+ string_stream << "cmdstathist_" << command_name << ":";
+ for (std::size_t i{0}; i <
stats.commands_histogram[command_name].buckets.size(); ++i) {
+ auto bucket_value =
stats.commands_histogram[command_name].buckets[i]->load();
+ auto bucket_bound = std::numeric_limits<double>::infinity();
+ if (i < stats.bucket_boundaries.size()) {
+ bucket_bound = stats.bucket_boundaries[i];
+ }
+
+ string_stream << bucket_bound << "=" << bucket_value << ",";
+ }
+ string_stream << "sum=" << sum << ",count=" << calls << "\r\n";
+ }
+
*info = string_stream.str();
}
diff --git a/src/stats/stats.cc b/src/stats/stats.cc
index ae18638b..5baea34d 100644
--- a/src/stats/stats.cc
+++ b/src/stats/stats.cc
@@ -26,7 +26,7 @@
#include "fmt/format.h"
#include "time_util.h"
-Stats::Stats() {
+Stats::Stats(std::vector<double> bucket_boundaries) :
bucket_boundaries(std::move(bucket_boundaries)) {
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
InstMetric im;
im.last_sample_time_ms = 0;
@@ -86,10 +86,22 @@ int64_t Stats::GetMemoryRSS() {
void Stats::IncrCalls(const std::string &command_name) {
total_calls.fetch_add(1, std::memory_order_relaxed);
commands_stats[command_name].calls.fetch_add(1, std::memory_order_relaxed);
+
+ if (bucket_boundaries.size() > 0) {
+ commands_histogram[command_name].calls.fetch_add(1,
std::memory_order_relaxed);
+ }
}
void Stats::IncrLatency(uint64_t latency, const std::string &command_name) {
commands_stats[command_name].latency.fetch_add(latency,
std::memory_order_relaxed);
+
+ if (bucket_boundaries.size() > 0) {
+ commands_histogram[command_name].sum.fetch_add(latency,
std::memory_order_relaxed);
+
+ const auto bucket_index = static_cast<std::size_t>(std::distance(
+ bucket_boundaries.begin(), std::lower_bound(bucket_boundaries.begin(),
bucket_boundaries.end(), latency)));
+ commands_histogram[command_name].buckets[bucket_index]->fetch_add(1,
std::memory_order_relaxed);
+ }
}
void Stats::TrackInstantaneousMetric(int metric, uint64_t current_reading) {
diff --git a/src/stats/stats.h b/src/stats/stats.h
index e00506a9..0bae042d 100644
--- a/src/stats/stats.h
+++ b/src/stats/stats.h
@@ -22,8 +22,10 @@
#include <unistd.h>
+#include <algorithm>
#include <atomic>
#include <map>
+#include <memory>
#include <shared_mutex>
#include <string>
#include <vector>
@@ -43,6 +45,13 @@ enum StatsMetricFlags {
constexpr int STATS_METRIC_SAMPLES = 16; // Number of samples per metric
+// Experimental part to support histograms for cmd statistics
+struct CommandHistogram {
+ std::vector<std::unique_ptr<std::atomic<uint64_t>>> buckets;
+ std::atomic<uint64_t> calls;
+ std::atomic<uint64_t> sum;
+};
+
struct CommandStat {
std::atomic<uint64_t> calls;
std::atomic<uint64_t> latency;
@@ -69,7 +78,12 @@ class Stats {
std::atomic<uint64_t> psync_ok_count = {0};
std::map<std::string, CommandStat> commands_stats;
- Stats();
+ using BucketBoundaries = std::vector<double>;
+ BucketBoundaries bucket_boundaries;
+ std::map<std::string, CommandHistogram> commands_histogram;
+
+ explicit Stats(std::vector<double> histogram_bucket_boundaries);
+
void IncrCalls(const std::string &command_name);
void IncrLatency(uint64_t latency, const std::string &command_name);
void IncrInboundBytes(uint64_t bytes) { in_bytes.fetch_add(bytes,
std::memory_order_relaxed); }
diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc
index 4e9b3817..2a6ac1b5 100644
--- a/tests/cppunit/config_test.cc
+++ b/tests/cppunit/config_test.cc
@@ -130,6 +130,8 @@ TEST(Config, GetAndSet) {
{"rocksdb.rate_limiter_auto_tuned", "yes"},
{"rocksdb.compression_level", "32767"},
{"rocksdb.wal_compression", "no"},
+ {"histogram-bucket-boundaries", "10,100,1000,10000"},
+
};
for (const auto &iter : immutable_cases) {
s = config.Set(nullptr, iter.first, iter.second);
diff --git a/tests/gocase/unit/info/info_test.go
b/tests/gocase/unit/info/info_test.go
index dd4b0cdc..128e5774 100644
--- a/tests/gocase/unit/info/info_test.go
+++ b/tests/gocase/unit/info/info_test.go
@@ -23,17 +23,20 @@ import (
"context"
"fmt"
"strconv"
+ "strings"
"testing"
"time"
- "github.com/redis/go-redis/v9"
-
"github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
)
func TestInfo(t *testing.T) {
- srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ srv0 := util.StartServer(t, map[string]string{
+ "cluster-enabled": "yes",
+ "histogram-bucket-boundaries": "10,20,30,50",
+ })
defer func() { srv0.Close() }()
rdb0 := srv0.NewClient()
defer func() { require.NoError(t, rdb0.Close()) }()
@@ -102,6 +105,20 @@ func TestInfo(t *testing.T) {
t.Run("get cluster information by INFO - cluster enabled", func(t
*testing.T) {
require.Equal(t, "1", util.FindInfoEntry(rdb0,
"cluster_enabled", "cluster"))
})
+
+ t.Run("get command latencies via histogram INFO -
histogram-bucket-boundaries", func(t *testing.T) {
+ output := util.FindInfoEntry(rdb0, "cmdstathist_info",
"commandstats")
+
+ splitValues := strings.FieldsFunc(output, func(r rune) bool {
+ return r == '=' || r == ','
+ })
+
+ // expected: 10=..,20=..,30=..,50=..,inf=..,sum=...,count=..
+ require.GreaterOrEqual(t, len(splitValues), 14)
+ require.Contains(t, splitValues, "sum")
+ require.Contains(t, splitValues, "count")
+ require.Contains(t, splitValues, "inf")
+ })
}
func TestKeyspaceHitMiss(t *testing.T) {