This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch connectors-versions
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 9f4d886dd404bad1d15271b8dbab439bbc07eb3a
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Mon Feb 2 16:19:10 2026 +0100

    feat(connectors): add connector version to stats endpoint
---
 core/connectors/runtime/src/context.rs        |  2 ++
 core/connectors/runtime/src/main.rs           |  4 ++++
 core/connectors/runtime/src/manager/sink.rs   |  1 +
 core/connectors/runtime/src/manager/source.rs |  1 +
 core/connectors/runtime/src/sink.rs           | 13 +++++++++++++
 core/connectors/runtime/src/source.rs         | 13 +++++++++++++
 core/connectors/runtime/src/stats.rs          | 14 ++++++++++++++
 core/connectors/sdk/src/sink.rs               |  7 +++++++
 core/connectors/sdk/src/source.rs             |  7 +++++++
 9 files changed, 62 insertions(+)

diff --git a/core/connectors/runtime/src/context.rs 
b/core/connectors/runtime/src/context.rs
index 20d9906ca..831c76c7a 100644
--- a/core/connectors/runtime/src/context.rs
+++ b/core/connectors/runtime/src/context.rs
@@ -93,6 +93,7 @@ fn map_sinks(
                     key: sink_plugin.key.to_owned(),
                     name: sink_plugin.name.to_owned(),
                     path: sink_plugin.path.to_owned(),
+                    version: sink_plugin.version.to_owned(),
                     enabled: sink_config.enabled,
                     status,
                     last_error: sink_plugin
@@ -134,6 +135,7 @@ fn map_sources(
                     key: source_plugin.key.to_owned(),
                     name: source_plugin.name.to_owned(),
                     path: source_plugin.path.to_owned(),
+                    version: source_plugin.version.to_owned(),
                     enabled: source_config.enabled,
                     status,
                     last_error: source_plugin
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index b71bd5eef..f9b187c83 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -81,6 +81,7 @@ struct SourceApi {
     ) -> i32,
     handle: extern "C" fn(id: u32, callback: SendCallback) -> i32,
     close: extern "C" fn(id: u32) -> i32,
+    version: extern "C" fn() -> *const std::ffi::c_char,
 }
 
 #[derive(WrapperApi)]
@@ -102,6 +103,7 @@ struct SinkApi {
         messages_len: usize,
     ) -> i32,
     close: extern "C" fn(id: u32) -> i32,
+    version: extern "C" fn() -> *const std::ffi::c_char,
 }
 
 fn print_ascii_art(text: &str) {
@@ -280,6 +282,7 @@ struct SinkConnectorPlugin {
     key: String,
     name: String,
     path: String,
+    version: String,
     config_format: Option<ConfigFormat>,
     consumers: Vec<SinkConnectorConsumer>,
     error: Option<String>,
@@ -313,6 +316,7 @@ struct SourceConnectorPlugin {
     key: String,
     name: String,
     path: String,
+    version: String,
     config_format: Option<ConfigFormat>,
     transforms: Vec<Arc<dyn Transform>>,
     producer: Option<SourceConnectorProducer>,
diff --git a/core/connectors/runtime/src/manager/sink.rs 
b/core/connectors/runtime/src/manager/sink.rs
index 495693716..50d98a29a 100644
--- a/core/connectors/runtime/src/manager/sink.rs
+++ b/core/connectors/runtime/src/manager/sink.rs
@@ -104,6 +104,7 @@ pub struct SinkInfo {
     pub key: String,
     pub name: String,
     pub path: String,
+    pub version: String,
     pub enabled: bool,
     pub status: ConnectorStatus,
     pub last_error: Option<ConnectorError>,
diff --git a/core/connectors/runtime/src/manager/source.rs 
b/core/connectors/runtime/src/manager/source.rs
index 051668640..a63c6d1df 100644
--- a/core/connectors/runtime/src/manager/source.rs
+++ b/core/connectors/runtime/src/manager/source.rs
@@ -104,6 +104,7 @@ pub struct SourceInfo {
     pub key: String,
     pub name: String,
     pub path: String,
+    pub version: String,
     pub enabled: bool,
     pub status: ConnectorStatus,
     pub last_error: Option<ConnectorError>,
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index 0987eb7e5..ccd8b9d98 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -65,6 +65,7 @@ pub async fn init(
         let init_error: Option<String>;
         if let Some(container) = sink_connectors.get_mut(&path) {
             info!("Sink container for plugin: {path} is already loaded.",);
+            let version = get_plugin_version(&container.container);
             init_error = init_sink(
                 &container.container,
                 &config.plugin_config.unwrap_or_default(),
@@ -77,6 +78,7 @@ pub async fn init(
                 key: key.to_owned(),
                 name: name.to_owned(),
                 path: path.to_owned(),
+                version,
                 config_format: config.plugin_config_format,
                 consumers: vec![],
                 error: init_error.clone(),
@@ -86,6 +88,7 @@ pub async fn init(
             let container: Container<SinkApi> =
                 unsafe { Container::load(&path).expect("Failed to load sink 
container") };
             info!("Sink container for plugin: {path} loaded successfully.",);
+            let version = get_plugin_version(&container);
             init_error = init_sink(
                 &container,
                 &config.plugin_config.unwrap_or_default(),
@@ -102,6 +105,7 @@ pub async fn init(
                         key: key.to_owned(),
                         name: name.to_owned(),
                         path: path.to_owned(),
+                        version,
                         config_format: config.plugin_config_format,
                         consumers: vec![],
                         error: init_error.clone(),
@@ -331,6 +335,15 @@ async fn consume_messages(
     Ok(())
 }
 
+fn get_plugin_version(container: &Container<SinkApi>) -> String {
+    unsafe {
+        let version_ptr = (container.version)();
+        std::ffi::CStr::from_ptr(version_ptr)
+            .to_string_lossy()
+            .into_owned()
+    }
+}
+
 fn init_sink(
     container: &Container<SinkApi>,
     plugin_config: &serde_json::Value,
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index c4f155b5a..727544f16 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -75,6 +75,7 @@ pub async fn init(
         let init_error: Option<String>;
         if let Some(container) = source_connectors.get_mut(&path) {
             info!("Source container for plugin: {path} is already loaded.",);
+            let version = get_plugin_version(&container.container);
             init_error = init_source(
                 &container.container,
                 &config.plugin_config.unwrap_or_default(),
@@ -88,6 +89,7 @@ pub async fn init(
                 key: key.to_owned(),
                 name: name.to_owned(),
                 path: path.to_owned(),
+                version,
                 config_format: config.plugin_config_format,
                 producer: None,
                 transforms: vec![],
@@ -99,6 +101,7 @@ pub async fn init(
             let container: Container<SourceApi> =
                 unsafe { Container::load(&path).expect("Failed to load source 
container") };
             info!("Source container for plugin: {path} loaded successfully.",);
+            let version = get_plugin_version(&container);
             init_error = init_source(
                 &container,
                 &config.plugin_config.unwrap_or_default(),
@@ -116,6 +119,7 @@ pub async fn init(
                         key: key.to_owned(),
                         name: name.to_owned(),
                         path: path.to_owned(),
+                        version,
                         config_format: config.plugin_config_format,
                         producer: None,
                         transforms: vec![],
@@ -187,6 +191,15 @@ pub async fn init(
     Ok(source_connectors)
 }
 
+fn get_plugin_version(container: &Container<SourceApi>) -> String {
+    unsafe {
+        let version_ptr = (container.version)();
+        std::ffi::CStr::from_ptr(version_ptr)
+            .to_string_lossy()
+            .into_owned()
+    }
+}
+
 fn init_source(
     container: &Container<SourceApi>,
     plugin_config: &serde_json::Value,
diff --git a/core/connectors/runtime/src/stats.rs 
b/core/connectors/runtime/src/stats.rs
index 7a3e7717b..3cf0664e1 100644
--- a/core/connectors/runtime/src/stats.rs
+++ b/core/connectors/runtime/src/stats.rs
@@ -22,6 +22,7 @@ use crate::manager::status::ConnectorStatus;
 use crate::metrics::ConnectorType;
 use iggy_common::{IggyTimestamp, SemanticVersion};
 use serde::Serialize;
+use std::str::FromStr;
 use std::sync::Arc;
 use sysinfo::System;
 
@@ -52,6 +53,9 @@ pub struct ConnectorStats {
     pub key: String,
     pub name: String,
     pub connector_type: String,
+    pub version: String,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub version_semver: Option<u32>,
     pub status: ConnectorStatus,
     pub enabled: bool,
     #[serde(skip_serializing_if = "Option::is_none")]
@@ -95,10 +99,15 @@ pub async fn get_runtime_stats(context: 
&Arc<RuntimeContext>) -> ConnectorRuntim
 
     let mut connectors = Vec::with_capacity(sources.len() + sinks.len());
     for source in &sources {
+        let version_semver = SemanticVersion::from_str(&source.version)
+            .ok()
+            .and_then(|v| v.get_numeric_version().ok());
         connectors.push(ConnectorStats {
             key: source.key.clone(),
             name: source.name.clone(),
             connector_type: "source".to_owned(),
+            version: source.version.clone(),
+            version_semver,
             status: source.status,
             enabled: source.enabled,
             messages_produced: 
Some(context.metrics.get_messages_produced(&source.key)),
@@ -111,10 +120,15 @@ pub async fn get_runtime_stats(context: 
&Arc<RuntimeContext>) -> ConnectorRuntim
         });
     }
     for sink in &sinks {
+        let version_semver = SemanticVersion::from_str(&sink.version)
+            .ok()
+            .and_then(|v| v.get_numeric_version().ok());
         connectors.push(ConnectorStats {
             key: sink.key.clone(),
             name: sink.name.clone(),
             connector_type: "sink".to_owned(),
+            version: sink.version.clone(),
+            version_semver,
             status: sink.status,
             enabled: sink.enabled,
             messages_produced: None,
diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
index 457618e75..4176e5c65 100644
--- a/core/connectors/sdk/src/sink.rs
+++ b/core/connectors/sdk/src/sink.rs
@@ -277,5 +277,12 @@ macro_rules! sink_connector {
             };
             instance.1.close()
         }
+
+        #[cfg(not(test))]
+        #[unsafe(no_mangle)]
+        extern "C" fn version() -> *const std::ffi::c_char {
+            static VERSION: &str = concat!(env!("CARGO_PKG_VERSION"), "\0");
+            VERSION.as_ptr() as *const std::ffi::c_char
+        }
     };
 }
diff --git a/core/connectors/sdk/src/source.rs 
b/core/connectors/sdk/src/source.rs
index 16b45aee7..3531c4277 100644
--- a/core/connectors/sdk/src/source.rs
+++ b/core/connectors/sdk/src/source.rs
@@ -263,5 +263,12 @@ macro_rules! source_connector {
             };
             instance.1.close()
         }
+
+        #[cfg(not(test))]
+        #[unsafe(no_mangle)]
+        extern "C" fn version() -> *const std::ffi::c_char {
+            static VERSION: &str = concat!(env!("CARGO_PKG_VERSION"), "\0");
+            VERSION.as_ptr() as *const std::ffi::c_char
+        }
     };
 }

Reply via email to