This is an automated email from the ASF dual-hosted git repository. maciej pushed a commit to branch update-connectors-stats in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 69034246a91bea155c11df8a87d24f130f1a2c51 Author: Maciej Modzelewski <[email protected]> AuthorDate: Fri Jan 30 12:02:54 2026 +0100 feat(connectors): extend published stats --- core/common/src/lib.rs | 35 ++++++++++++---------- core/common/src/utils/mod.rs | 35 ++++++++++++---------- .../{server/src => common/src/utils}/versioning.rs | 27 +++-------------- core/connectors/runtime/src/stats.rs | 30 +++++++++++++++---- core/integration/tests/state/mod.rs | 5 ++-- core/server/src/bootstrap.rs | 2 +- core/server/src/lib.rs | 8 ++--- core/server/src/main.rs | 2 +- core/server/src/shard/builder.rs | 5 ++-- core/server/src/shard/mod.rs | 7 +++-- core/server/src/shard/system/info.rs | 5 ++-- core/server/src/state/file.rs | 5 ++-- 12 files changed, 89 insertions(+), 77 deletions(-) diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index f5a955ac2..c8391d601 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -1,19 +1,21 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ mod alloc; mod certificates; @@ -105,3 +107,4 @@ pub use utils::personal_access_token_expiry::PersonalAccessTokenExpiry; pub use utils::text; pub use utils::timestamp::*; pub use utils::topic_size::MaxTopicSize; +pub use utils::versioning::SemanticVersion; diff --git a/core/common/src/utils/mod.rs b/core/common/src/utils/mod.rs index aa8714daa..34e080b6c 100644 --- a/core/common/src/utils/mod.rs +++ b/core/common/src/utils/mod.rs @@ -1,19 +1,21 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ pub(crate) mod byte_size; pub(crate) mod checksum; @@ -25,3 +27,4 @@ pub(crate) mod personal_access_token_expiry; pub mod text; pub(crate) mod timestamp; pub(crate) mod topic_size; +pub(crate) mod versioning; diff --git a/core/server/src/versioning.rs b/core/common/src/utils/versioning.rs similarity index 94% rename from core/server/src/versioning.rs rename to core/common/src/utils/versioning.rs index 5412217ef..3356b1bad 100644 --- a/core/server/src/versioning.rs +++ b/core/common/src/utils/versioning.rs @@ -1,4 +1,5 @@ -/* Licensed to the Apache Software Foundation (ASF) under one +/* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -16,8 +17,7 @@ * under the License. */ -use crate::VERSION; -use iggy_common::IggyError; +use crate::IggyError; use std::borrow::Cow; use std::fmt::Display; use std::str::FromStr; @@ -150,11 +150,7 @@ impl FromStr for SemanticVersion { } impl SemanticVersion { - pub const fn current() -> Self { - Self::parse_const(VERSION) - } - - const fn parse_const(s: &'static str) -> Self { + pub const fn parse_const(s: &'static str) -> Self { let bytes = s.as_bytes(); // Split on '+' to ignore build metadata @@ -327,21 +323,6 @@ mod tests { assert_eq!(SEMVER_3.prerelease, Some(Cow::Borrowed("alpha.2"))); } - #[test] - fn test_semantic_version_current() { - let version = SemanticVersion::current(); - - assert!(version.major < 1000); - assert!(version.minor < 1000); - assert!(version.patch < 1000); - } - - #[test] - fn should_load_the_expected_version_from_package_definition() { - const CARGO_TOML_VERSION: &str = env!("CARGO_PKG_VERSION"); - assert_eq!(crate::VERSION, CARGO_TOML_VERSION); - } - #[test] fn should_parse_basic_semantic_version() { let version = "1.2.3".parse::<SemanticVersion>().unwrap(); diff --git a/core/connectors/runtime/src/stats.rs b/core/connectors/runtime/src/stats.rs index df2ca62ed..94523338b 100644 --- a/core/connectors/runtime/src/stats.rs +++ b/core/connectors/runtime/src/stats.rs @@ -18,17 +18,26 @@ */ use crate::context::RuntimeContext; +use crate::manager::status::ConnectorStatus; use crate::metrics::ConnectorType; -use iggy_common::IggyTimestamp; +use iggy_common::{IggyTimestamp, SemanticVersion}; use serde::Serialize; use std::sync::Arc; use sysinfo::System; +const VERSION: &str = env!("CARGO_PKG_VERSION"); +const SEMANTIC_VERSION: SemanticVersion = SemanticVersion::parse_const(VERSION); + #[derive(Debug, Serialize)] pub struct ConnectorRuntimeStats { + pub version: String, + pub version_semver: Option<u32>, pub process_id: u32, pub cpu_usage: f32, + pub total_cpu_usage: f32, pub memory_usage: u64, + pub total_memory: u64, + pub available_memory: u64, pub run_time: u64, pub start_time: u64, pub sources_total: u32, @@ -43,7 +52,7 @@ pub struct ConnectorStats { pub key: String, pub name: String, pub connector_type: String, - pub status: String, + pub status: ConnectorStatus, pub enabled: bool, #[serde(skip_serializing_if = "Option::is_none")] pub messages_produced: Option<u64>, @@ -59,12 +68,18 @@ pub struct ConnectorStats { pub async fn get_runtime_stats(context: &Arc<RuntimeContext>) -> ConnectorRuntimeStats { let pid = std::process::id(); - let mut system = System::new(); + let mut system = System::new_all(); + system.refresh_cpu_all(); + system.refresh_memory(); system.refresh_processes( sysinfo::ProcessesToUpdate::Some(&[sysinfo::Pid::from_u32(pid)]), true, ); + let total_cpu_usage = system.global_cpu_usage(); + let total_memory = system.total_memory(); + let available_memory = system.available_memory(); + let (cpu_usage, memory_usage) = system .process(sysinfo::Pid::from_u32(pid)) .map(|p| (p.cpu_usage(), p.memory())) @@ -84,7 +99,7 @@ pub async fn get_runtime_stats(context: &Arc<RuntimeContext>) -> ConnectorRuntim key: source.key.clone(), name: source.name.clone(), connector_type: "source".to_owned(), - status: source.status.to_string(), + status: source.status, enabled: source.enabled, messages_produced: Some(context.metrics.get_messages_produced(&source.key)), messages_sent: Some(context.metrics.get_messages_sent(&source.key)), @@ -100,7 +115,7 @@ pub async fn get_runtime_stats(context: &Arc<RuntimeContext>) -> ConnectorRuntim key: sink.key.clone(), name: sink.name.clone(), connector_type: "sink".to_owned(), - status: sink.status.to_string(), + status: sink.status, enabled: sink.enabled, messages_produced: None, messages_sent: None, @@ -115,9 +130,14 @@ pub async fn get_runtime_stats(context: &Arc<RuntimeContext>) -> ConnectorRuntim let run_time = now.saturating_sub(start); ConnectorRuntimeStats { + version: VERSION.to_owned(), + version_semver: SEMANTIC_VERSION.get_numeric_version().ok(), process_id: pid, cpu_usage, + total_cpu_usage, memory_usage, + total_memory, + available_memory, run_time, start_time: start, sources_total, diff --git a/core/integration/tests/state/mod.rs b/core/integration/tests/state/mod.rs index 2ff235c45..586b20fa8 100644 --- a/core/integration/tests/state/mod.rs +++ b/core/integration/tests/state/mod.rs @@ -1,4 +1,5 @@ -/* Licensed to the Apache Software Foundation (ASF) under one +/* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -18,10 +19,10 @@ use compio::fs::create_dir; use iggy::prelude::{Aes256GcmEncryptor, EncryptorKind}; +use iggy_common::SemanticVersion; use server::state::file::FileState; use server::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind}; use server::streaming::utils::file::overwrite; -use server::versioning::SemanticVersion; use std::str::FromStr; use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicU64}; diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index 12174ed4b..70703e43b 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -49,10 +49,10 @@ use crate::{ users::user::User, utils::{crypto, file::overwrite}, }, - versioning::SemanticVersion, }; use compio::{fs::create_dir_all, runtime::Runtime}; use err_trail::ErrContext; +use iggy_common::SemanticVersion; use iggy_common::{ IggyByteSize, IggyError, PersonalAccessToken, defaults::{ diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index b6a758006..1fc0d0d3f 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -1,4 +1,5 @@ -/* Licensed to the Apache Software Foundation (ASF) under one +/* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -19,7 +20,7 @@ #[cfg(not(feature = "disable-mimalloc"))] use mimalloc::MiMalloc; -use crate::versioning::SemanticVersion; +use iggy_common::SemanticVersion; #[cfg(not(feature = "disable-mimalloc"))] #[global_allocator] @@ -44,10 +45,9 @@ pub mod shard; pub mod state; pub mod streaming; pub mod tcp; -pub mod versioning; pub mod websocket; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub const SEMANTIC_VERSION: SemanticVersion = SemanticVersion::current(); +pub const SEMANTIC_VERSION: SemanticVersion = SemanticVersion::parse_const(VERSION); pub const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME"; pub const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD"; diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 35ccbffc0..d3ce11f49 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -23,6 +23,7 @@ use dashmap::DashMap; use dotenvy::dotenv; use err_trail::ErrContext; use figlet_rs::FIGfont; +use iggy_common::SemanticVersion; use iggy_common::sharding::{IggyNamespace, LocalIdx, PartitionLocation, ShardId}; use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError, MemoryPool}; use server::SEMANTIC_VERSION; @@ -45,7 +46,6 @@ use server::streaming::clients::client_manager::{Client, ClientManager}; use server::streaming::diagnostics::metrics::Metrics; use server::streaming::storage::SystemStorage; use server::streaming::utils::ptr::EternalPtr; -use server::versioning::SemanticVersion; use std::panic::AssertUnwindSafe; use std::rc::Rc; use std::str::FromStr; diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index ee29bc6a8..ad8267eac 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -1,4 +1,5 @@ -/* Licensed to the Apache Software Foundation (ASF) under one +/* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -29,11 +30,11 @@ use crate::{ clients::client_manager::ClientManager, diagnostics::metrics::Metrics, utils::ptr::EternalPtr, }, - versioning::SemanticVersion, }; use ahash::AHashSet; use dashmap::DashMap; use iggy_common::EncryptorKind; +use iggy_common::SemanticVersion; use iggy_common::sharding::{IggyNamespace, PartitionLocation}; use std::{ cell::{Cell, RefCell}, diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 68b0902c8..7f9318e53 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -1,5 +1,6 @@ -/* Licensed to the Apache Software Foundation (ASF) under one -inner() * or more contributor license agreements. See the NOTICE file +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the @@ -40,10 +41,10 @@ use crate::{ clients::client_manager::ClientManager, diagnostics::metrics::Metrics, session::Session, utils::ptr::EternalPtr, }, - versioning::SemanticVersion, }; use builder::IggyShardBuilder; use dashmap::DashMap; +use iggy_common::SemanticVersion; use iggy_common::sharding::{IggyNamespace, PartitionLocation}; use iggy_common::{EncryptorKind, IggyError}; use std::{ diff --git a/core/server/src/shard/system/info.rs b/core/server/src/shard/system/info.rs index 3c20fd63b..e5bd5581e 100644 --- a/core/server/src/shard/system/info.rs +++ b/core/server/src/shard/system/info.rs @@ -1,4 +1,5 @@ -/* Licensed to the Apache Software Foundation (ASF) under one +/* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -16,7 +17,7 @@ * under the License. */ -use crate::versioning::SemanticVersion; +use iggy_common::SemanticVersion; use serde::{Deserialize, Serialize}; use std::collections::hash_map::DefaultHasher; use std::fmt::Display; diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs index c83f2950d..08f91baf8 100644 --- a/core/server/src/state/file.rs +++ b/core/server/src/state/file.rs @@ -1,4 +1,5 @@ -/* Licensed to the Apache Software Foundation (ASF) under one +/* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -20,7 +21,6 @@ use crate::state::command::EntryCommand; use crate::state::{COMPONENT, StateEntry}; use crate::streaming::persistence::persister::PersisterKind; use crate::streaming::utils::file; -use crate::versioning::SemanticVersion; use bytes::{Buf, BufMut, Bytes, BytesMut}; use compio::io::AsyncReadExt; use err_trail::ErrContext; @@ -29,6 +29,7 @@ use iggy_common::EncryptorKind; use iggy_common::IggyByteSize; use iggy_common::IggyError; use iggy_common::IggyTimestamp; +use iggy_common::SemanticVersion; use std::fmt::Debug; use std::path::Path; use std::sync::Arc;
