This is an automated email from the ASF dual-hosted git repository. maciej pushed a commit to branch connectors-versions in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 9f4d886dd404bad1d15271b8dbab439bbc07eb3a Author: Maciej Modzelewski <[email protected]> AuthorDate: Mon Feb 2 16:19:10 2026 +0100 feat(connectors): add connector version to stats endpoint --- core/connectors/runtime/src/context.rs | 2 ++ core/connectors/runtime/src/main.rs | 4 ++++ core/connectors/runtime/src/manager/sink.rs | 1 + core/connectors/runtime/src/manager/source.rs | 1 + core/connectors/runtime/src/sink.rs | 13 +++++++++++++ core/connectors/runtime/src/source.rs | 13 +++++++++++++ core/connectors/runtime/src/stats.rs | 14 ++++++++++++++ core/connectors/sdk/src/sink.rs | 7 +++++++ core/connectors/sdk/src/source.rs | 7 +++++++ 9 files changed, 62 insertions(+) diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs index 20d9906ca..831c76c7a 100644 --- a/core/connectors/runtime/src/context.rs +++ b/core/connectors/runtime/src/context.rs @@ -93,6 +93,7 @@ fn map_sinks( key: sink_plugin.key.to_owned(), name: sink_plugin.name.to_owned(), path: sink_plugin.path.to_owned(), + version: sink_plugin.version.to_owned(), enabled: sink_config.enabled, status, last_error: sink_plugin @@ -134,6 +135,7 @@ fn map_sources( key: source_plugin.key.to_owned(), name: source_plugin.name.to_owned(), path: source_plugin.path.to_owned(), + version: source_plugin.version.to_owned(), enabled: source_config.enabled, status, last_error: source_plugin diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index b71bd5eef..f9b187c83 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -81,6 +81,7 @@ struct SourceApi { ) -> i32, handle: extern "C" fn(id: u32, callback: SendCallback) -> i32, close: extern "C" fn(id: u32) -> i32, + version: extern "C" fn() -> *const std::ffi::c_char, } #[derive(WrapperApi)] @@ -102,6 +103,7 @@ struct SinkApi { messages_len: usize, ) -> i32, close: extern "C" fn(id: u32) -> i32, + version: extern "C" fn() -> *const std::ffi::c_char, } fn print_ascii_art(text: &str) { @@ -280,6 +282,7 @@ struct SinkConnectorPlugin { key: String, name: String, path: String, + version: String, config_format: Option<ConfigFormat>, consumers: Vec<SinkConnectorConsumer>, error: Option<String>, @@ -313,6 +316,7 @@ struct SourceConnectorPlugin { key: String, name: String, path: String, + version: String, config_format: Option<ConfigFormat>, transforms: Vec<Arc<dyn Transform>>, producer: Option<SourceConnectorProducer>, diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index 495693716..50d98a29a 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -104,6 +104,7 @@ pub struct SinkInfo { pub key: String, pub name: String, pub path: String, + pub version: String, pub enabled: bool, pub status: ConnectorStatus, pub last_error: Option<ConnectorError>, diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index 051668640..a63c6d1df 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -104,6 +104,7 @@ pub struct SourceInfo { pub key: String, pub name: String, pub path: String, + pub version: String, pub enabled: bool, pub status: ConnectorStatus, pub last_error: Option<ConnectorError>, diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 0987eb7e5..ccd8b9d98 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -65,6 +65,7 @@ pub async fn init( let init_error: Option<String>; if let Some(container) = sink_connectors.get_mut(&path) { info!("Sink container for plugin: {path} is already loaded.",); + let version = get_plugin_version(&container.container); init_error = init_sink( &container.container, &config.plugin_config.unwrap_or_default(), @@ -77,6 +78,7 @@ pub async fn init( key: key.to_owned(), name: name.to_owned(), path: path.to_owned(), + version, config_format: config.plugin_config_format, consumers: vec![], error: init_error.clone(), @@ -86,6 +88,7 @@ pub async fn init( let container: Container<SinkApi> = unsafe { Container::load(&path).expect("Failed to load sink container") }; info!("Sink container for plugin: {path} loaded successfully.",); + let version = get_plugin_version(&container); init_error = init_sink( &container, &config.plugin_config.unwrap_or_default(), @@ -102,6 +105,7 @@ pub async fn init( key: key.to_owned(), name: name.to_owned(), path: path.to_owned(), + version, config_format: config.plugin_config_format, consumers: vec![], error: init_error.clone(), @@ -331,6 +335,15 @@ async fn consume_messages( Ok(()) } +fn get_plugin_version(container: &Container<SinkApi>) -> String { + unsafe { + let version_ptr = (container.version)(); + std::ffi::CStr::from_ptr(version_ptr) + .to_string_lossy() + .into_owned() + } +} + fn init_sink( container: &Container<SinkApi>, plugin_config: &serde_json::Value, diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index c4f155b5a..727544f16 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -75,6 +75,7 @@ pub async fn init( let init_error: Option<String>; if let Some(container) = source_connectors.get_mut(&path) { info!("Source container for plugin: {path} is already loaded.",); + let version = get_plugin_version(&container.container); init_error = init_source( &container.container, &config.plugin_config.unwrap_or_default(), @@ -88,6 +89,7 @@ pub async fn init( key: key.to_owned(), name: name.to_owned(), path: path.to_owned(), + version, config_format: config.plugin_config_format, producer: None, transforms: vec![], @@ -99,6 +101,7 @@ pub async fn init( let container: Container<SourceApi> = unsafe { Container::load(&path).expect("Failed to load source container") }; info!("Source container for plugin: {path} loaded successfully.",); + let version = get_plugin_version(&container); init_error = init_source( &container, &config.plugin_config.unwrap_or_default(), @@ -116,6 +119,7 @@ pub async fn init( key: key.to_owned(), name: name.to_owned(), path: path.to_owned(), + version, config_format: config.plugin_config_format, producer: None, transforms: vec![], @@ -187,6 +191,15 @@ pub async fn init( Ok(source_connectors) } +fn get_plugin_version(container: &Container<SourceApi>) -> String { + unsafe { + let version_ptr = (container.version)(); + std::ffi::CStr::from_ptr(version_ptr) + .to_string_lossy() + .into_owned() + } +} + fn init_source( container: &Container<SourceApi>, plugin_config: &serde_json::Value, diff --git a/core/connectors/runtime/src/stats.rs b/core/connectors/runtime/src/stats.rs index 7a3e7717b..3cf0664e1 100644 --- a/core/connectors/runtime/src/stats.rs +++ b/core/connectors/runtime/src/stats.rs @@ -22,6 +22,7 @@ use crate::manager::status::ConnectorStatus; use crate::metrics::ConnectorType; use iggy_common::{IggyTimestamp, SemanticVersion}; use serde::Serialize; +use std::str::FromStr; use std::sync::Arc; use sysinfo::System; @@ -52,6 +53,9 @@ pub struct ConnectorStats { pub key: String, pub name: String, pub connector_type: String, + pub version: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub version_semver: Option<u32>, pub status: ConnectorStatus, pub enabled: bool, #[serde(skip_serializing_if = "Option::is_none")] @@ -95,10 +99,15 @@ pub async fn get_runtime_stats(context: &Arc<RuntimeContext>) -> ConnectorRuntim let mut connectors = Vec::with_capacity(sources.len() + sinks.len()); for source in &sources { + let version_semver = SemanticVersion::from_str(&source.version) + .ok() + .and_then(|v| v.get_numeric_version().ok()); connectors.push(ConnectorStats { key: source.key.clone(), name: source.name.clone(), connector_type: "source".to_owned(), + version: source.version.clone(), + version_semver, status: source.status, enabled: source.enabled, messages_produced: Some(context.metrics.get_messages_produced(&source.key)), @@ -111,10 +120,15 @@ pub async fn get_runtime_stats(context: &Arc<RuntimeContext>) -> ConnectorRuntim }); } for sink in &sinks { + let version_semver = SemanticVersion::from_str(&sink.version) + .ok() + .and_then(|v| v.get_numeric_version().ok()); connectors.push(ConnectorStats { key: sink.key.clone(), name: sink.name.clone(), connector_type: "sink".to_owned(), + version: sink.version.clone(), + version_semver, status: sink.status, enabled: sink.enabled, messages_produced: None, diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs index 457618e75..4176e5c65 100644 --- a/core/connectors/sdk/src/sink.rs +++ b/core/connectors/sdk/src/sink.rs @@ -277,5 +277,12 @@ macro_rules! sink_connector { }; instance.1.close() } + + #[cfg(not(test))] + #[unsafe(no_mangle)] + extern "C" fn version() -> *const std::ffi::c_char { + static VERSION: &str = concat!(env!("CARGO_PKG_VERSION"), "\0"); + VERSION.as_ptr() as *const std::ffi::c_char + } }; } diff --git a/core/connectors/sdk/src/source.rs b/core/connectors/sdk/src/source.rs index 16b45aee7..3531c4277 100644 --- a/core/connectors/sdk/src/source.rs +++ b/core/connectors/sdk/src/source.rs @@ -263,5 +263,12 @@ macro_rules! source_connector { }; instance.1.close() } + + #[cfg(not(test))] + #[unsafe(no_mangle)] + extern "C" fn version() -> *const std::ffi::c_char { + static VERSION: &str = concat!(env!("CARGO_PKG_VERSION"), "\0"); + VERSION.as_ptr() as *const std::ffi::c_char + } }; }
