This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_getclustermetadata in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 85977024dc3283e91d9f8c93eaf929552c406659 Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Sep 23 13:28:53 2025 +0200 feat(io_uring): fix GetClusterMetadata command/endpoint --- .../cluster/get_cluster_metadata_handler.rs | 8 ++--- core/server/src/http/system.rs | 7 ++-- .../cluster/mod.rs => shard/system/cluster.rs} | 41 ++++++++++++---------- core/server/src/shard/system/mod.rs | 1 + 4 files changed, 27 insertions(+), 30 deletions(-) diff --git a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs index 5f811281..7de09769 100644 --- a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs +++ b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs @@ -40,18 +40,14 @@ impl ServerCommandHandler for GetClusterMetadata { sender: &mut SenderKind, _length: u32, session: &Rc<Session>, - system: &Rc<IggyShard>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { - // TODO: fix it; - /* debug!("session: {session}, command: {self}"); - let system = system.read().await; - let cluster_metadata = system.get_cluster_metadata(session)?; + let cluster_metadata = shard.get_cluster_metadata(session)?; let response = cluster_metadata.to_bytes(); sender.send_ok_response(&response).await?; - */ Ok(()) } } diff --git a/core/server/src/http/system.rs b/core/server/src/http/system.rs index 0fece9d4..d4e2b0f6 100644 --- a/core/server/src/http/system.rs +++ b/core/server/src/http/system.rs @@ -77,11 +77,9 @@ async fn get_cluster_metadata( State(state): State<Arc<AppState>>, Extension(identity): Extension<Identity>, ) -> Result<Json<ClusterMetadata>, CustomError> { - todo!(); - /* let shard = state.shard.shard(); - let cluster_metadata = shard - .get_cluster_metadata(&Session::stateless(identity.user_id, identity.ip_address)) + let session = Session::stateless(identity.user_id, identity.ip_address); + let cluster_metadata = shard.get_cluster_metadata(&session) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get cluster metadata, user ID: {}", @@ -89,7 +87,6 @@ async fn get_cluster_metadata( ) })?; Ok(Json(cluster_metadata)) - */ } async fn get_client( diff --git a/core/server/src/streaming/cluster/mod.rs b/core/server/src/shard/system/cluster.rs similarity index 71% rename from core/server/src/streaming/cluster/mod.rs rename to core/server/src/shard/system/cluster.rs index 036ebe1c..f2bca11b 100644 --- a/core/server/src/streaming/cluster/mod.rs +++ b/core/server/src/shard/system/cluster.rs @@ -16,25 +16,33 @@ * under the License. */ -/* +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::System; -use iggy_common::IggyError; -use iggy_common::metadata::ClusterMetadata; -use iggy_common::node::ClusterNode; -use iggy_common::role::ClusterNodeRole; -use iggy_common::status::ClusterNodeStatus; +use iggy_common::{ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus, IggyError, TransportProtocol}; +use std::str::FromStr; use tracing::trace; -impl System { +impl IggyShard { pub fn get_cluster_metadata(&self, session: &Session) -> Result<ClusterMetadata, IggyError> { trace!("Getting cluster metadata for session: {session}"); + if !self.config.cluster.enabled { + return Err(IggyError::FeatureUnavailable); + } + // TODO(hubcio): Clustering is not yet implemented // The leader/follower as well as node status are currently placeholder implementations. + let name = self.config.cluster.name.clone(); + let id = self.config.cluster.id; + + // Parse transport string to TransportProtocol enum + let transport = TransportProtocol::from_str(&self.config.cluster.transport) + .map_err(|_| IggyError::InvalidConfiguration)?; + let nodes: Vec<ClusterNode> = self - .cluster_config + .config + .cluster .nodes .iter() .map(|node_config| { @@ -44,11 +52,7 @@ impl System { ClusterNodeRole::Follower }; - let status = if self.cluster_config.enabled { - ClusterNodeStatus::Healthy - } else { - ClusterNodeStatus::Stopping - }; + let status = ClusterNodeStatus::Healthy; ClusterNode { id: node_config.id, @@ -61,11 +65,10 @@ impl System { .collect(); Ok(ClusterMetadata { - name: self.cluster_config.name.clone(), - id: self.cluster_config.id, - transport: self.cluster_config.transport.clone(), + name, + id, + transport, nodes, }) } -} - */ +} \ No newline at end of file diff --git a/core/server/src/shard/system/mod.rs b/core/server/src/shard/system/mod.rs index 7398650a..73bed645 100644 --- a/core/server/src/shard/system/mod.rs +++ b/core/server/src/shard/system/mod.rs @@ -17,6 +17,7 @@ */ pub mod clients; +pub mod cluster; pub mod consumer_groups; pub mod consumer_offsets; pub mod info;
