This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 79103ee4b feat(server): add threads count and disk space to sysinfo
stats (#2917)
79103ee4b is described below
commit 79103ee4b26681f1b152d258d679cb91196f26cc
Author: xin <[email protected]>
AuthorDate: Wed Mar 18 00:20:35 2026 +0900
feat(server): add threads count and disk space to sysinfo stats (#2917)
---
core/cli/src/commands/binary_system/stats.rs | 23 ++++
core/common/src/traits/binary_mapper.rs | 38 ++++++
core/common/src/types/stats/mod.rs | 9 ++
.../tests/cli/system/test_stats_command.rs | 130 ++++++++++++++++++++-
core/server/src/binary/mapper.rs | 4 +
core/server/src/shard/system/stats.rs | 21 ++++
.../src/shard/tasks/periodic/sysinfo_printer.rs | 12 +-
7 files changed, 232 insertions(+), 5 deletions(-)
diff --git a/core/cli/src/commands/binary_system/stats.rs
b/core/cli/src/commands/binary_system/stats.rs
index 595013f32..c63e6ef48 100644
--- a/core/cli/src/commands/binary_system/stats.rs
+++ b/core/cli/src/commands/binary_system/stats.rs
@@ -159,6 +159,19 @@ impl CliCommand for GetStatsCmd {
format!("{}", stats.consumer_groups_count).as_str(),
]);
+ table.add_row(vec![
+ "Threads Count",
+ format!("{}", stats.threads_count).as_str(),
+ ]);
+ table.add_row(vec![
+ "Free Disk Space",
+ stats.free_disk_space.as_bytes_u64().to_string().as_str(),
+ ]);
+ table.add_row(vec![
+ "Total Disk Space",
+ stats.total_disk_space.as_bytes_u64().to_string().as_str(),
+ ]);
+
table.add_row(vec!["OS Name", stats.os_name.as_str()]);
table.add_row(vec!["OS Version", stats.os_version.as_str()]);
table.add_row(vec!["Kernel Version",
stats.kernel_version.as_str()]);
@@ -208,6 +221,16 @@ impl CliCommand for GetStatsCmd {
stats.consumer_groups_count
));
+ list.push(format!("Threads Count|{}", stats.threads_count));
+ list.push(format!(
+ "Free Disk Space|{}",
+ stats.free_disk_space.as_bytes_u64()
+ ));
+ list.push(format!(
+ "Total Disk Space|{}",
+ stats.total_disk_space.as_bytes_u64()
+ ));
+
list.push(format!("OS Name|{}", stats.os_name));
list.push(format!("OS Version|{}", stats.os_version));
list.push(format!("Kernel Version|{}", stats.kernel_version));
diff --git a/core/common/src/traits/binary_mapper.rs
b/core/common/src/traits/binary_mapper.rs
index f211febaf..52b3e6056 100644
--- a/core/common/src/traits/binary_mapper.rs
+++ b/core/common/src/traits/binary_mapper.rs
@@ -340,6 +340,41 @@ pub fn map_stats(payload: Bytes) -> Result<Stats,
IggyError> {
}
}
+ let mut threads_count = 0u32;
+ if current_position + 4 <= payload.len() {
+ threads_count = u32::from_le_bytes(
+ payload[current_position..current_position + 4]
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ );
+ current_position += 4;
+ }
+
+ let mut free_disk_space: IggyByteSize = 0.into();
+ if current_position + 8 <= payload.len() {
+ free_disk_space = u64::from_le_bytes(
+ payload[current_position..current_position + 8]
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ )
+ .into();
+ current_position += 8;
+ }
+
+ let mut total_disk_space: IggyByteSize = 0.into();
+ if current_position + 8 <= payload.len() {
+ total_disk_space = u64::from_le_bytes(
+ payload[current_position..current_position + 8]
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ )
+ .into();
+ #[allow(unused_assignments)]
+ {
+ current_position += 8;
+ }
+ }
+
Ok(Stats {
process_id,
cpu_usage,
@@ -366,6 +401,9 @@ pub fn map_stats(payload: Bytes) -> Result<Stats,
IggyError> {
iggy_server_version,
iggy_server_semver,
cache_metrics,
+ threads_count,
+ free_disk_space,
+ total_disk_space,
})
}
diff --git a/core/common/src/types/stats/mod.rs
b/core/common/src/types/stats/mod.rs
index e5e6a8f6b..e76edc0d3 100644
--- a/core/common/src/types/stats/mod.rs
+++ b/core/common/src/types/stats/mod.rs
@@ -74,6 +74,12 @@ pub struct Stats {
/// Cache metrics per partition
#[serde(with = "cache_metrics_serializer")]
pub cache_metrics: HashMap<CacheMetricsKey, CacheMetrics>,
+ /// The number of threads in the server process.
+ pub threads_count: u32,
+ /// The available (free) disk space for the data directory.
+ pub free_disk_space: IggyByteSize,
+ /// The total disk space for the data directory.
+ pub total_disk_space: IggyByteSize,
}
/// Key for identifying a specific partition's cache metrics
@@ -181,6 +187,9 @@ impl Default for Stats {
iggy_server_version: "unknown_iggy_version".to_string(),
iggy_server_semver: None,
cache_metrics: HashMap::new(),
+ threads_count: 0,
+ free_disk_space: 0.into(),
+ total_disk_space: 0.into(),
}
}
}
diff --git a/core/integration/tests/cli/system/test_stats_command.rs
b/core/integration/tests/cli/system/test_stats_command.rs
index 2d4bd72e1..d3c6b5f75 100644
--- a/core/integration/tests/cli/system/test_stats_command.rs
+++ b/core/integration/tests/cli/system/test_stats_command.rs
@@ -24,10 +24,14 @@ use async_trait::async_trait;
use iggy::prelude::Client;
use iggy::prelude::Identifier;
use iggy::prelude::IggyExpiry;
+use iggy::prelude::IggyMessage;
use iggy::prelude::MaxTopicSize;
+use iggy::prelude::Partitioning;
use iggy_cli::commands::binary_system::stats::GetStatsOutput;
+use iggy_common::Stats;
use predicates::str::{contains, starts_with};
use serial_test::parallel;
+use std::str::FromStr;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum TestStatsCmdOutput {
@@ -97,7 +101,10 @@ impl IggyCmdTestCase for TestStatsCmd {
.stdout(contains("Segments Count | 5"))
.stdout(contains("Message Count | 0"))
// Note: Client count can vary due to connection
lifecycle; at least 2 expected
- .stdout(contains("Consumer Groups Count | 0"));
+ .stdout(contains("Consumer Groups Count | 0"))
+ .stdout(contains("Threads Count"))
+ .stdout(contains("Free Disk Space"))
+ .stdout(contains("Total Disk Space"));
}
TestStatsCmdOutput::Set(GetStatsOutput::List) => {
command_state
@@ -107,7 +114,10 @@ impl IggyCmdTestCase for TestStatsCmd {
.stdout(contains("Partitions Count|5"))
.stdout(contains("Segments Count|5"))
.stdout(contains("Message Count|0"))
- .stdout(contains("Consumer Groups Count|0"));
+ .stdout(contains("Consumer Groups Count|0"))
+ .stdout(contains("Threads Count|"))
+ .stdout(contains("Free Disk Space|"))
+ .stdout(contains("Total Disk Space|"));
}
TestStatsCmdOutput::Set(GetStatsOutput::Json) => {
command_state
@@ -117,7 +127,10 @@ impl IggyCmdTestCase for TestStatsCmd {
.stdout(contains(r#""partitions_count": 5"#))
.stdout(contains(r#""segments_count": 5"#))
.stdout(contains(r#""messages_count": 0"#))
- .stdout(contains(r#""consumer_groups_count": 0"#));
+ .stdout(contains(r#""consumer_groups_count": 0"#))
+ .stdout(contains(r#""threads_count":"#))
+ .stdout(contains(r#""free_disk_space":"#))
+ .stdout(contains(r#""total_disk_space":"#));
}
TestStatsCmdOutput::Set(GetStatsOutput::Toml) => {
command_state
@@ -127,7 +140,10 @@ impl IggyCmdTestCase for TestStatsCmd {
.stdout(contains("partitions_count = 5"))
.stdout(contains("segments_count = 5"))
.stdout(contains("messages_count = 0"))
- .stdout(contains("consumer_groups_count = 0"));
+ .stdout(contains("consumer_groups_count = 0"))
+ .stdout(contains("threads_count ="))
+ .stdout(contains("free_disk_space ="))
+ .stdout(contains("total_disk_space ="));
}
}
}
@@ -143,6 +159,109 @@ impl IggyCmdTestCase for TestStatsCmd {
}
}
+struct TestStatsCmdWithMessages {
+ stream_id: u32,
+ topic_id: u32,
+}
+
+impl TestStatsCmdWithMessages {
+ fn new() -> Self {
+ Self {
+ stream_id: 0,
+ topic_id: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl IggyCmdTestCase for TestStatsCmdWithMessages {
+ async fn prepare_server_state(&mut self, client: &dyn Client) {
+ let stream = client.create_stream("size-test").await;
+ assert!(stream.is_ok());
+ let stream_details = stream.unwrap();
+ self.stream_id = stream_details.id;
+
+ let topic = client
+ .create_topic(
+ &self.stream_id.try_into().unwrap(),
+ "topic",
+ 1,
+ Default::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await;
+ assert!(topic.is_ok());
+ let topic_details = topic.unwrap();
+ self.topic_id = topic_details.id;
+
+ let mut messages = (1..=10)
+ .filter_map(|id| IggyMessage::from_str(format!("Test message
{id}").as_str()).ok())
+ .collect::<Vec<_>>();
+ let send_status = client
+ .send_messages(
+ &self.stream_id.try_into().unwrap(),
+ &self.topic_id.try_into().unwrap(),
+ &Partitioning::default(),
+ &mut messages,
+ )
+ .await;
+ assert!(send_status.is_ok());
+ }
+
+ fn get_command(&self) -> IggyCmdCommand {
+ IggyCmdCommand::new()
+ .arg("stats")
+ .arg("-o")
+ .arg("json")
+ .opt("-q")
+ .with_env_credentials()
+ }
+
+ fn verify_command(&self, command_state: Assert) {
+ let assert = command_state.success();
+ let stdout = String::from_utf8_lossy(&assert.get_output().stdout);
+ let stats: Stats =
+ serde_json::from_str(&stdout).expect("Failed to parse stats JSON
output");
+
+ assert!(
+ stats.messages_count > 0,
+ "messages_count should be > 0 after sending messages"
+ );
+ assert!(
+ stats.messages_size_bytes.as_bytes_u64() > 0,
+ "messages_size_bytes should be > 0 after sending messages"
+ );
+ assert!(
+ stats.free_disk_space.as_bytes_u64() > 0,
+ "free_disk_space should be > 0"
+ );
+ assert!(
+ stats.total_disk_space.as_bytes_u64() > 0,
+ "total_disk_space should be > 0"
+ );
+ assert!(
+ stats.free_disk_space.as_bytes_u64() <=
stats.total_disk_space.as_bytes_u64(),
+ "free_disk_space should be <= total_disk_space"
+ );
+ }
+
+ async fn verify_server_state(&self, client: &dyn Client) {
+ client
+ .delete_topic(
+ &self.stream_id.try_into().unwrap(),
+ &self.topic_id.try_into().unwrap(),
+ )
+ .await
+ .unwrap();
+ client
+ .delete_stream(&self.stream_id.try_into().unwrap())
+ .await
+ .unwrap();
+ }
+}
+
#[tokio::test]
#[parallel]
pub async fn should_be_successful() {
@@ -172,6 +291,9 @@ pub async fn should_be_successful() {
GetStatsOutput::Toml,
)))
.await;
+ iggy_cmd_test
+ .execute_test(TestStatsCmdWithMessages::new())
+ .await;
}
#[tokio::test]
diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs
index 819544aa9..d439700d5 100644
--- a/core/server/src/binary/mapper.rs
+++ b/core/server/src/binary/mapper.rs
@@ -72,6 +72,10 @@ pub fn map_stats(stats: &Stats) -> Bytes {
bytes.put_f32_le(metrics.hit_ratio);
}
+ bytes.put_u32_le(stats.threads_count);
+ bytes.put_u64_le(stats.free_disk_space.as_bytes_u64());
+ bytes.put_u64_le(stats.total_disk_space.as_bytes_u64());
+
bytes.freeze()
}
diff --git a/core/server/src/shard/system/stats.rs
b/core/server/src/shard/system/stats.rs
index 03e01f6b2..dad687969 100644
--- a/core/server/src/shard/system/stats.rs
+++ b/core/server/src/shard/system/stats.rs
@@ -87,6 +87,8 @@ impl IggyShard {
let disk_usage = process.disk_usage();
stats.read_bytes = disk_usage.total_read_bytes.into();
stats.written_bytes = disk_usage.total_written_bytes.into();
+
+ stats.threads_count = process.tasks().map(|t| t.len() as
u32).unwrap_or(0);
}
let (streams_count, topics_count, partitions_count,
consumer_groups_count, stream_ids) =
@@ -118,6 +120,25 @@ impl IggyShard {
}
}
+ match fs2::available_space(&self.config.system.path) {
+ Ok(space) => stats.free_disk_space = space.into(),
+ Err(err) => {
+ tracing::warn!(
+ "Failed to get available disk space for '{}': {err}",
+ self.config.system.path
+ );
+ }
+ }
+ match fs2::total_space(&self.config.system.path) {
+ Ok(space) => stats.total_disk_space = space.into(),
+ Err(err) => {
+ tracing::warn!(
+ "Failed to get total disk space for '{}': {err}",
+ self.config.system.path
+ );
+ }
+ }
+
Ok(stats)
})
}
diff --git a/core/server/src/shard/tasks/periodic/sysinfo_printer.rs
b/core/server/src/shard/tasks/periodic/sysinfo_printer.rs
index 82b81edf1..4bddf4798 100644
--- a/core/server/src/shard/tasks/periodic/sysinfo_printer.rs
+++ b/core/server/src/shard/tasks/periodic/sysinfo_printer.rs
@@ -69,6 +69,12 @@ async fn print_sysinfo(shard: Rc<IggyShard>) -> Result<(),
IggyError> {
/ stats.total_memory.as_bytes_u64() as f64)
* 100f64;
+ let threads_info = if stats.threads_count > 0 {
+ format!(", Threads: {}", stats.threads_count)
+ } else {
+ String::new()
+ };
+
let open_files_info = if let Some(open_files) =
get_open_file_descriptors() {
format!(", OpenFDs: {}", open_files)
} else {
@@ -76,17 +82,21 @@ async fn print_sysinfo(shard: Rc<IggyShard>) -> Result<(),
IggyError> {
};
info!(
- "CPU: {:.2}%/{:.2}% (IggyUsage/Total), Mem: {:.2}%/{}/{}/{}
(Free/IggyUsage/TotalUsed/Total), Clients: {}, Messages: {}, Read: {}, Written:
{}{}",
+ "CPU: {:.2}%/{:.2}% (IggyUsage/Total), Mem: {:.2}%/{}/{}/{}
(Free/IggyUsage/TotalUsed/Total), Disk: {}/{} (Free/Total), IggyUsage: {},
Clients: {}, Messages: {}, Read: {}, Written: {}{}{}",
stats.cpu_usage,
stats.total_cpu_usage,
free_memory_percent,
stats.memory_usage,
stats.total_memory - stats.available_memory,
stats.total_memory,
+ stats.free_disk_space,
+ stats.total_disk_space,
+ stats.messages_size_bytes,
stats.clients_count.human_count_bare().to_string(),
stats.messages_count.human_count_bare().to_string(),
stats.read_bytes,
stats.written_bytes,
+ threads_info,
open_files_info,
);