This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new b9487b2c fix(server): resolve TODOs and optimize reference usage 
(#2248)
b9487b2c is described below

commit b9487b2caf6a7dd0348d91e3b28d02002b0152cd
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Oct 7 11:46:50 2025 +0200

    fix(server): resolve TODOs and optimize reference usage (#2248)
    
    - Fix consumer_offsets to use with_partition_by_id directly
    - Move calculate_partition_id_by_messages_key_hash to topics helpers
    - Refactor purge_stream2 to use purge_stream2_base helper
    - Remove unnecessary user lookup in logout_user_base
    - Replace Arc with references in snapshot functions
    - Document snapshot thread pool limitation
---
 core/server/src/shard/system/consumer_offsets.rs | 26 ++++++++++++++-----
 core/server/src/shard/system/messages.rs         | 33 ++++++------------------
 core/server/src/shard/system/snapshot/mod.rs     | 20 +++++++++-----
 core/server/src/shard/system/streams.rs          | 13 +---------
 core/server/src/shard/system/users.rs            | 14 ++--------
 core/server/src/streaming/topics/helpers.rs      | 18 +++++++++++++
 6 files changed, 61 insertions(+), 63 deletions(-)

diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index af5f71bb..47ab62a6 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -193,14 +193,23 @@ impl IggyShard {
         partition_id: usize,
         offset: u64,
     ) {
-        // TODO: This can use `with_partition_by_id` directly.
+        let stream_id_num = self
+            .streams2
+            .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
+        let topic_id_num =
+            self.streams2
+                .with_topic_by_id(stream_id, topic_id, 
topics::helpers::get_topic_id());
+
         match polling_consumer {
             PollingConsumer::Consumer(id, _) => {
-                self.streams2.with_stream_by_id(
+                self.streams2.with_partition_by_id(
                     stream_id,
-                    streams::helpers::store_consumer_offset(
+                    topic_id,
+                    partition_id,
+                    partitions::helpers::store_consumer_offset(
                         *id,
-                        topic_id,
+                        stream_id_num,
+                        topic_id_num,
                         partition_id,
                         offset,
                         &self.config.system,
@@ -208,11 +217,14 @@ impl IggyShard {
                 );
             }
             PollingConsumer::ConsumerGroup(_, id) => {
-                self.streams2.with_stream_by_id(
+                self.streams2.with_partition_by_id(
                     stream_id,
-                    streams::helpers::store_consumer_group_member_offset(
+                    topic_id,
+                    partition_id,
+                    partitions::helpers::store_consumer_group_member_offset(
                         *id,
-                        topic_id,
+                        stream_id_num,
+                        topic_id_num,
                         partition_id,
                         offset,
                         &self.config.system,
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index efef5a1c..70baa14c 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -24,13 +24,12 @@ use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
 };
-use crate::shard_trace;
 use crate::streaming::partitions::journal::Journal;
 use crate::streaming::polling_consumer::PollingConsumer;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet};
 use crate::streaming::session::Session;
 use crate::streaming::traits::MainOps;
-use crate::streaming::utils::{PooledBuffer, hash};
+use crate::streaming::utils::PooledBuffer;
 use crate::streaming::{partitions, streams, topics};
 use error_set::ErrContext;
 use iggy_common::{
@@ -49,24 +48,6 @@ impl IggyShard {
         partitioning: &Partitioning,
         batch: IggyMessagesBatchMut,
     ) -> Result<(), IggyError> {
-        // TODO: move to helpers.
-        fn calculate_partition_id_by_messages_key_hash(
-            shard_id: u16,
-            upperbound: usize,
-            messages_key: &[u8],
-        ) -> usize {
-            let messages_key_hash = hash::calculate_32(messages_key) as usize;
-            let partition_id = messages_key_hash % upperbound;
-            shard_trace!(
-                shard_id,
-                "Calculated partition ID: {} for messages key: {:?}, hash: {}",
-                partition_id,
-                messages_key,
-                messages_key_hash
-            );
-            partition_id
-        }
-
         let numeric_stream_id = self
             .streams2
             .with_stream_by_id(&stream_id, streams::helpers::get_stream_id());
@@ -108,11 +89,13 @@ impl IggyShard {
                         ) as usize),
                         PartitioningKind::MessagesKey => {
                             let upperbound = root.partitions().len();
-                            Ok(calculate_partition_id_by_messages_key_hash(
-                                self.id,
-                                upperbound,
-                                &partitioning.value,
-                            ))
+                            Ok(
+                                
topics::helpers::calculate_partition_id_by_messages_key_hash(
+                                    self.id,
+                                    upperbound,
+                                    &partitioning.value,
+                                ),
+                            )
                         }
                     },
                 )?;
diff --git a/core/server/src/shard/system/snapshot/mod.rs 
b/core/server/src/shard/system/snapshot/mod.rs
index 9244a0c4..b97b5dcd 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -27,13 +27,19 @@ use compio::fs::OpenOptions;
 use compio::io::AsyncWriteAtExt;
 use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, 
SystemSnapshotType};
 use std::path::PathBuf;
-// TODO: compio has an `process` module, consider using that instead, but read 
the docs carefully // https://compio.rs/docs/compio/process
-use std::process::Command;
-use std::sync::Arc;
 use std::time::Instant;
 use tempfile::NamedTempFile;
 use tracing::{error, info};
 
+// TODO(hubcio): compio has a `process` module, but it currently blocks the 
executor when the runtime
+// has thread_pool_limit(0) configured (which we do on non-macOS platforms in 
bootstrap.rs).
+// To use compio::process::Command, we need to either:
+// 1. Enable thread pool by removing/increasing thread_pool_limit(0)
+// 2. Use std::process::Command with compio::runtime::spawn_blocking (requires 
thread pool)
+// 3. Find alternative approach that doesn't rely on thread pool
+// See: https://compio.rs/docs/compio/process and 
bootstrap::create_shard_executor
+use std::process::Command;
+
 impl IggyShard {
     pub async fn get_snapshot(
         &self,
@@ -68,7 +74,7 @@ impl IggyShard {
 
         for snapshot_type in snapshot_types {
             info!("Processing snapshot type: {:?}", snapshot_type);
-            match get_command_result(snapshot_type, 
self.config.system.clone()).await {
+            match get_command_result(snapshot_type, &self.config.system).await 
{
                 Ok(temp_file) => {
                     info!(
                         "Got temp file for {:?}: {}",
@@ -215,7 +221,7 @@ async fn get_test_snapshot() -> Result<NamedTempFile, 
std::io::Error> {
     write_command_output_to_temp_file(Command::new("echo").arg("test")).await
 }
 
-async fn get_server_logs(config: Arc<SystemConfig>) -> Result<NamedTempFile, 
std::io::Error> {
+async fn get_server_logs(config: &SystemConfig) -> Result<NamedTempFile, 
std::io::Error> {
     let base_directory = PathBuf::from(config.get_system_path());
     let logs_subdirectory = PathBuf::from(&config.logging.path);
     let logs_path = base_directory.join(logs_subdirectory);
@@ -228,7 +234,7 @@ async fn get_server_logs(config: Arc<SystemConfig>) -> 
Result<NamedTempFile, std
     write_command_output_to_temp_file(Command::new("sh").args(["-c", 
&list_and_cat])).await
 }
 
-async fn get_server_config(config: Arc<SystemConfig>) -> Result<NamedTempFile, 
std::io::Error> {
+async fn get_server_config(config: &SystemConfig) -> Result<NamedTempFile, 
std::io::Error> {
     let base_directory = PathBuf::from(config.get_system_path());
     let config_path = 
base_directory.join("runtime").join("current_config.toml");
 
@@ -237,7 +243,7 @@ async fn get_server_config(config: Arc<SystemConfig>) -> 
Result<NamedTempFile, s
 
 async fn get_command_result(
     snapshot_type: &SystemSnapshotType,
-    config: Arc<SystemConfig>,
+    config: &SystemConfig,
 ) -> Result<NamedTempFile, std::io::Error> {
     match snapshot_type {
         SystemSnapshotType::FilesystemOverview => 
get_filesystem_overview().await,
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index cd497401..6e76b2ca 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -178,18 +178,7 @@ impl IggyShard {
             })?;
         }
 
-        //TODO: Tech debt.
-        let topic_ids = self
-            .streams2
-            .with_stream_by_id(stream_id, streams::helpers::get_topic_ids());
-
-        // Purge each topic in the stream using bypass auth
-        for topic_id in topic_ids {
-            let topic_identifier = Identifier::numeric(topic_id as 
u32).unwrap();
-            self.purge_topic2(session, stream_id, &topic_identifier)
-                .await?;
-        }
-        Ok(())
+        self.purge_stream2_base(stream_id).await
     }
 
     pub async fn purge_stream2_bypass_auth(&self, stream_id: &Identifier) -> 
Result<(), IggyError> {
diff --git a/core/server/src/shard/system/users.rs 
b/core/server/src/shard/system/users.rs
index aef6754b..8aa7d963 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -515,21 +515,11 @@ impl IggyShard {
     pub fn logout_user(&self, session: &Session) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
         let client_id = session.client_id;
-        let user_id = session.get_user_id();
-        self.logout_user_base(user_id, client_id)?;
+        self.logout_user_base(client_id)?;
         Ok(())
     }
 
-    fn logout_user_base(&self, user_id: u32, client_id: u32) -> Result<(), 
IggyError> {
-        // TODO(hubcio): not sure if user is needed here
-        let _user = self
-            .get_user(&Identifier::numeric(user_id)?)
-            .with_error_context(|error| {
-                format!(
-                    "{COMPONENT} (error: {error}) - failed to get user with 
id: {}",
-                    user_id,
-                )
-            })?;
+    fn logout_user_base(&self, client_id: u32) -> Result<(), IggyError> {
         if client_id > 0 {
             let mut client_manager = self.client_manager.borrow_mut();
             client_manager.clear_user_id(client_id)?;
diff --git a/core/server/src/streaming/topics/helpers.rs 
b/core/server/src/streaming/topics/helpers.rs
index 1ddc0454..87c29c36 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -16,6 +16,7 @@ use crate::{
             },
             topic2::{Topic, TopicRef, TopicRefMut, TopicRoot},
         },
+        utils::hash,
     },
 };
 use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
@@ -51,6 +52,23 @@ pub fn get_max_topic_size() -> impl 
FnOnce(ComponentsById<TopicRef>) -> MaxTopic
     |(root, _, _)| root.max_topic_size()
 }
 
+pub fn calculate_partition_id_by_messages_key_hash(
+    shard_id: u16,
+    upperbound: usize,
+    messages_key: &[u8],
+) -> usize {
+    let messages_key_hash = hash::calculate_32(messages_key) as usize;
+    let partition_id = messages_key_hash % upperbound;
+    shard_trace!(
+        shard_id,
+        "Calculated partition ID: {} for messages key: {:?}, hash: {}",
+        partition_id,
+        messages_key,
+        messages_key_hash
+    );
+    partition_id
+}
+
 pub fn delete_topic(topic_id: &Identifier) -> impl FnOnce(&Topics) -> Topic {
     |container| {
         let id = container.get_index(topic_id);

Reply via email to