This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 393b0f6fa feat(connectors): extend published stats (#2640)
393b0f6fa is described below
commit 393b0f6fabc49c99965c452fcab6066823bf8cd6
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Fri Jan 30 13:22:40 2026 +0100
feat(connectors): extend published stats (#2640)
---
core/common/src/lib.rs | 35 ++++++++++++----------
core/common/src/utils/mod.rs | 35 ++++++++++++----------
.../{server/src => common/src/utils}/versioning.rs | 27 +++--------------
core/connectors/runtime/src/stats.rs | 30 +++++++++++++++----
core/integration/tests/state/mod.rs | 5 ++--
core/server/src/bootstrap.rs | 2 +-
core/server/src/lib.rs | 8 ++---
core/server/src/main.rs | 2 +-
core/server/src/shard/builder.rs | 5 ++--
core/server/src/shard/mod.rs | 7 +++--
core/server/src/shard/system/info.rs | 5 ++--
core/server/src/state/file.rs | 5 ++--
12 files changed, 89 insertions(+), 77 deletions(-)
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index f5a955ac2..c8391d601 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -1,19 +1,21 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
mod alloc;
mod certificates;
@@ -105,3 +107,4 @@ pub use
utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
pub use utils::text;
pub use utils::timestamp::*;
pub use utils::topic_size::MaxTopicSize;
+pub use utils::versioning::SemanticVersion;
diff --git a/core/common/src/utils/mod.rs b/core/common/src/utils/mod.rs
index aa8714daa..34e080b6c 100644
--- a/core/common/src/utils/mod.rs
+++ b/core/common/src/utils/mod.rs
@@ -1,19 +1,21 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
pub(crate) mod byte_size;
pub(crate) mod checksum;
@@ -25,3 +27,4 @@ pub(crate) mod personal_access_token_expiry;
pub mod text;
pub(crate) mod timestamp;
pub(crate) mod topic_size;
+pub(crate) mod versioning;
diff --git a/core/server/src/versioning.rs b/core/common/src/utils/versioning.rs
similarity index 94%
rename from core/server/src/versioning.rs
rename to core/common/src/utils/versioning.rs
index 5412217ef..3356b1bad 100644
--- a/core/server/src/versioning.rs
+++ b/core/common/src/utils/versioning.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -16,8 +17,7 @@
* under the License.
*/
-use crate::VERSION;
-use iggy_common::IggyError;
+use crate::IggyError;
use std::borrow::Cow;
use std::fmt::Display;
use std::str::FromStr;
@@ -150,11 +150,7 @@ impl FromStr for SemanticVersion {
}
impl SemanticVersion {
- pub const fn current() -> Self {
- Self::parse_const(VERSION)
- }
-
- const fn parse_const(s: &'static str) -> Self {
+ pub const fn parse_const(s: &'static str) -> Self {
let bytes = s.as_bytes();
// Split on '+' to ignore build metadata
@@ -327,21 +323,6 @@ mod tests {
assert_eq!(SEMVER_3.prerelease, Some(Cow::Borrowed("alpha.2")));
}
- #[test]
- fn test_semantic_version_current() {
- let version = SemanticVersion::current();
-
- assert!(version.major < 1000);
- assert!(version.minor < 1000);
- assert!(version.patch < 1000);
- }
-
- #[test]
- fn should_load_the_expected_version_from_package_definition() {
- const CARGO_TOML_VERSION: &str = env!("CARGO_PKG_VERSION");
- assert_eq!(crate::VERSION, CARGO_TOML_VERSION);
- }
-
#[test]
fn should_parse_basic_semantic_version() {
let version = "1.2.3".parse::<SemanticVersion>().unwrap();
diff --git a/core/connectors/runtime/src/stats.rs
b/core/connectors/runtime/src/stats.rs
index df2ca62ed..7a3e7717b 100644
--- a/core/connectors/runtime/src/stats.rs
+++ b/core/connectors/runtime/src/stats.rs
@@ -18,17 +18,26 @@
*/
use crate::context::RuntimeContext;
+use crate::manager::status::ConnectorStatus;
use crate::metrics::ConnectorType;
-use iggy_common::IggyTimestamp;
+use iggy_common::{IggyTimestamp, SemanticVersion};
use serde::Serialize;
use std::sync::Arc;
use sysinfo::System;
+const VERSION: &str = env!("CARGO_PKG_VERSION");
+const SEMANTIC_VERSION: SemanticVersion =
SemanticVersion::parse_const(VERSION);
+
#[derive(Debug, Serialize)]
pub struct ConnectorRuntimeStats {
+ pub connectors_runtime_version: String,
+ pub connectors_runtime_version_semver: Option<u32>,
pub process_id: u32,
pub cpu_usage: f32,
+ pub total_cpu_usage: f32,
pub memory_usage: u64,
+ pub total_memory: u64,
+ pub available_memory: u64,
pub run_time: u64,
pub start_time: u64,
pub sources_total: u32,
@@ -43,7 +52,7 @@ pub struct ConnectorStats {
pub key: String,
pub name: String,
pub connector_type: String,
- pub status: String,
+ pub status: ConnectorStatus,
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub messages_produced: Option<u64>,
@@ -59,12 +68,18 @@ pub struct ConnectorStats {
pub async fn get_runtime_stats(context: &Arc<RuntimeContext>) ->
ConnectorRuntimeStats {
let pid = std::process::id();
- let mut system = System::new();
+ let mut system = System::new_all();
+ system.refresh_cpu_all();
+ system.refresh_memory();
system.refresh_processes(
sysinfo::ProcessesToUpdate::Some(&[sysinfo::Pid::from_u32(pid)]),
true,
);
+ let total_cpu_usage = system.global_cpu_usage();
+ let total_memory = system.total_memory();
+ let available_memory = system.available_memory();
+
let (cpu_usage, memory_usage) = system
.process(sysinfo::Pid::from_u32(pid))
.map(|p| (p.cpu_usage(), p.memory()))
@@ -84,7 +99,7 @@ pub async fn get_runtime_stats(context: &Arc<RuntimeContext>)
-> ConnectorRuntim
key: source.key.clone(),
name: source.name.clone(),
connector_type: "source".to_owned(),
- status: source.status.to_string(),
+ status: source.status,
enabled: source.enabled,
messages_produced:
Some(context.metrics.get_messages_produced(&source.key)),
messages_sent:
Some(context.metrics.get_messages_sent(&source.key)),
@@ -100,7 +115,7 @@ pub async fn get_runtime_stats(context:
&Arc<RuntimeContext>) -> ConnectorRuntim
key: sink.key.clone(),
name: sink.name.clone(),
connector_type: "sink".to_owned(),
- status: sink.status.to_string(),
+ status: sink.status,
enabled: sink.enabled,
messages_produced: None,
messages_sent: None,
@@ -115,9 +130,14 @@ pub async fn get_runtime_stats(context:
&Arc<RuntimeContext>) -> ConnectorRuntim
let run_time = now.saturating_sub(start);
ConnectorRuntimeStats {
+ connectors_runtime_version: VERSION.to_owned(),
+ connectors_runtime_version_semver:
SEMANTIC_VERSION.get_numeric_version().ok(),
process_id: pid,
cpu_usage,
+ total_cpu_usage,
memory_usage,
+ total_memory,
+ available_memory,
run_time,
start_time: start,
sources_total,
diff --git a/core/integration/tests/state/mod.rs
b/core/integration/tests/state/mod.rs
index 2ff235c45..586b20fa8 100644
--- a/core/integration/tests/state/mod.rs
+++ b/core/integration/tests/state/mod.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -18,10 +19,10 @@
use compio::fs::create_dir;
use iggy::prelude::{Aes256GcmEncryptor, EncryptorKind};
+use iggy_common::SemanticVersion;
use server::state::file::FileState;
use server::streaming::persistence::persister::{FileWithSyncPersister,
PersisterKind};
use server::streaming::utils::file::overwrite;
-use server::versioning::SemanticVersion;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 12174ed4b..70703e43b 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -49,10 +49,10 @@ use crate::{
users::user::User,
utils::{crypto, file::overwrite},
},
- versioning::SemanticVersion,
};
use compio::{fs::create_dir_all, runtime::Runtime};
use err_trail::ErrContext;
+use iggy_common::SemanticVersion;
use iggy_common::{
IggyByteSize, IggyError, PersonalAccessToken,
defaults::{
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index b6a758006..1fc0d0d3f 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -19,7 +20,7 @@
#[cfg(not(feature = "disable-mimalloc"))]
use mimalloc::MiMalloc;
-use crate::versioning::SemanticVersion;
+use iggy_common::SemanticVersion;
#[cfg(not(feature = "disable-mimalloc"))]
#[global_allocator]
@@ -44,10 +45,9 @@ pub mod shard;
pub mod state;
pub mod streaming;
pub mod tcp;
-pub mod versioning;
pub mod websocket;
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
-pub const SEMANTIC_VERSION: SemanticVersion = SemanticVersion::current();
+pub const SEMANTIC_VERSION: SemanticVersion =
SemanticVersion::parse_const(VERSION);
pub const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME";
pub const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD";
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 35ccbffc0..d3ce11f49 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -23,6 +23,7 @@ use dashmap::DashMap;
use dotenvy::dotenv;
use err_trail::ErrContext;
use figlet_rs::FIGfont;
+use iggy_common::SemanticVersion;
use iggy_common::sharding::{IggyNamespace, LocalIdx, PartitionLocation,
ShardId};
use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError, MemoryPool};
use server::SEMANTIC_VERSION;
@@ -45,7 +46,6 @@ use server::streaming::clients::client_manager::{Client,
ClientManager};
use server::streaming::diagnostics::metrics::Metrics;
use server::streaming::storage::SystemStorage;
use server::streaming::utils::ptr::EternalPtr;
-use server::versioning::SemanticVersion;
use std::panic::AssertUnwindSafe;
use std::rc::Rc;
use std::str::FromStr;
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index ee29bc6a8..ad8267eac 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -29,11 +30,11 @@ use crate::{
clients::client_manager::ClientManager, diagnostics::metrics::Metrics,
utils::ptr::EternalPtr,
},
- versioning::SemanticVersion,
};
use ahash::AHashSet;
use dashmap::DashMap;
use iggy_common::EncryptorKind;
+use iggy_common::SemanticVersion;
use iggy_common::sharding::{IggyNamespace, PartitionLocation};
use std::{
cell::{Cell, RefCell},
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 68b0902c8..7f9318e53 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -1,5 +1,6 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
-inner() * or more contributor license agreements. See the NOTICE file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
@@ -40,10 +41,10 @@ use crate::{
clients::client_manager::ClientManager, diagnostics::metrics::Metrics,
session::Session,
utils::ptr::EternalPtr,
},
- versioning::SemanticVersion,
};
use builder::IggyShardBuilder;
use dashmap::DashMap;
+use iggy_common::SemanticVersion;
use iggy_common::sharding::{IggyNamespace, PartitionLocation};
use iggy_common::{EncryptorKind, IggyError};
use std::{
diff --git a/core/server/src/shard/system/info.rs
b/core/server/src/shard/system/info.rs
index 3c20fd63b..e5bd5581e 100644
--- a/core/server/src/shard/system/info.rs
+++ b/core/server/src/shard/system/info.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -16,7 +17,7 @@
* under the License.
*/
-use crate::versioning::SemanticVersion;
+use iggy_common::SemanticVersion;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::fmt::Display;
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index c83f2950d..08f91baf8 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -20,7 +21,6 @@ use crate::state::command::EntryCommand;
use crate::state::{COMPONENT, StateEntry};
use crate::streaming::persistence::persister::PersisterKind;
use crate::streaming::utils::file;
-use crate::versioning::SemanticVersion;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use compio::io::AsyncReadExt;
use err_trail::ErrContext;
@@ -29,6 +29,7 @@ use iggy_common::EncryptorKind;
use iggy_common::IggyByteSize;
use iggy_common::IggyError;
use iggy_common::IggyTimestamp;
+use iggy_common::SemanticVersion;
use std::fmt::Debug;
use std::path::Path;
use std::sync::Arc;