This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new df3a05ba server compiles
df3a05ba is described below

commit df3a05ba7b250fccf8c32ef05b53de45400fd343
Author: numinex <g.koszy...@gmail.com>
AuthorDate: Fri Jun 27 15:09:01 2025 +0200

    server compiles
---
 core/server/src/binary/command.rs                  |  1 -
 .../create_consumer_group_handler.rs               |  2 +-
 .../consumer_groups/get_consumer_group_handler.rs  |  3 +-
 .../leave_consumer_group_handler.rs                |  2 +-
 .../handlers/messages/poll_messages_handler.rs     |  2 +-
 .../binary/handlers/system/get_clients_handler.rs  |  8 +--
 core/server/src/binary/sender.rs                   |  3 +-
 core/server/src/configs/defaults.rs                |  3 +-
 core/server/src/configs/server.rs                  |  3 +-
 core/server/src/lib.rs                             |  4 +-
 core/server/src/main.rs                            | 78 ++++++++++------------
 core/server/src/shard/mod.rs                       |  8 ++-
 core/server/src/shard/system/snapshot/mod.rs       |  8 +--
 core/server/src/shard/transmission/message.rs      |  8 +--
 core/server/src/streaming/partitions/messages.rs   |  2 +-
 core/server/src/streaming/partitions/partition.rs  |  8 +--
 core/server/src/streaming/segments/segment.rs      |  8 +--
 core/server/src/streaming/storage.rs               |  7 +-
 core/server/src/streaming/streams/stream.rs        | 10 +--
 core/server/src/streaming/streams/topics.rs        |  2 +-
 .../server/src/streaming/topics/consumer_groups.rs |  2 +-
 core/server/src/streaming/topics/messages.rs       |  2 +-
 core/server/src/streaming/topics/storage.rs        |  2 +-
 core/server/src/streaming/topics/topic.rs          |  8 +--
 core/server/src/streaming/utils/memory_pool.rs     |  4 +-
 core/server/src/tcp/sender.rs                      |  5 +-
 core/server/src/tcp/tcp_listener.rs                |  3 +
 27 files changed, 93 insertions(+), 103 deletions(-)

diff --git a/core/server/src/binary/command.rs 
b/core/server/src/binary/command.rs
index 1ebac395..aa826590 100644
--- a/core/server/src/binary/command.rs
+++ b/core/server/src/binary/command.rs
@@ -68,7 +68,6 @@ use iggy_common::update_stream::UpdateStream;
 use iggy_common::update_topic::UpdateTopic;
 use iggy_common::update_user::UpdateUser;
 use iggy_common::*;
-use rustls::crypto::hash::Output;
 use strum::EnumString;
 use tracing::error;
 
diff --git 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index 266b46a6..e6e1f239 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -60,7 +60,7 @@ impl ServerCommandHandler for CreateConsumerGroup {
                         self.stream_id, self.topic_id, self.group_id
                     )
                 })?;
-            
+
         let stream = shard.find_stream(session, &self.stream_id)
             .with_error_context(|error| {
                 format!(
diff --git 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
index 8f67192b..dab89c59 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
@@ -46,7 +46,8 @@ impl ServerCommandHandler for GetConsumerGroup {
         debug!("session: {session}, command: {self}");
 
         let stream_id = &self.stream_id;
-        let stream = shard.find_stream(session, &self.stream_id)
+        let stream = shard
+            .find_stream(session, &self.stream_id)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to get stream for 
stream_id: {stream_id}"
diff --git 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
index ba352bf9..18ee8b8c 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use super::COMPONENT;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
@@ -27,7 +28,6 @@ use iggy_common::IggyError;
 use iggy_common::leave_consumer_group::LeaveConsumerGroup;
 use std::rc::Rc;
 use tracing::{debug, instrument};
-use super::COMPONENT;
 
 impl ServerCommandHandler for LeaveConsumerGroup {
     fn code(&self) -> u32 {
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index a591f691..9d6d526b 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -20,8 +20,8 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::messages::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
-use crate::shard::system::messages::PollingArgs;
 use crate::shard::IggyShard;
+use crate::shard::system::messages::PollingArgs;
 use crate::streaming::session::Session;
 use crate::to_iovec;
 use anyhow::Result;
diff --git a/core/server/src/binary/handlers/system/get_clients_handler.rs 
b/core/server/src/binary/handlers/system/get_clients_handler.rs
index c73ac987..9d5241f4 100644
--- a/core/server/src/binary/handlers/system/get_clients_handler.rs
+++ b/core/server/src/binary/handlers/system/get_clients_handler.rs
@@ -43,11 +43,9 @@ impl ServerCommandHandler for GetClients {
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let clients = shard
-            .get_clients(session)
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to get clients, 
session: {session}")
-            })?;
+        let clients = shard.get_clients(session).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get clients, 
session: {session}")
+        })?;
         let clients = mapper::map_clients(clients).await;
         sender.send_ok_response(&clients).await?;
         Ok(())
diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs
index ff405eb6..adbf05ee 100644
--- a/core/server/src/binary/sender.rs
+++ b/core/server/src/binary/sender.rs
@@ -54,11 +54,10 @@ macro_rules! forward_async_methods {
     }
 }
 
-
 pub trait Sender {
     fn read<B: IoBufMut>(
         &mut self,
-        buffer: B
+        buffer: B,
     ) -> impl Future<Output = (Result<usize, IggyError>, B)>;
     fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), 
IggyError>>;
     fn send_ok_response(&mut self, payload: &[u8]) -> impl Future<Output = 
Result<(), IggyError>>;
diff --git a/core/server/src/configs/defaults.rs 
b/core/server/src/configs/defaults.rs
index fc333ec7..82b560cf 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -36,7 +36,6 @@ use crate::configs::system::{
 use crate::configs::tcp::{TcpConfig, TcpTlsConfig};
 use iggy_common::IggyByteSize;
 use iggy_common::IggyDuration;
-use std::rc::Rc;
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -52,7 +51,7 @@ impl Default for ServerConfig {
             heartbeat: HeartbeatConfig::default(),
             message_saver: MessageSaverConfig::default(),
             personal_access_token: PersonalAccessTokenConfig::default(),
-            system: Rc::new(SystemConfig::default()),
+            system: Arc::new(SystemConfig::default()),
             quic: QuicConfig::default(),
             tcp: TcpConfig::default(),
             http: HttpConfig::default(),
diff --git a/core/server/src/configs/server.rs 
b/core/server/src/configs/server.rs
index 059fd1c4..252df9a7 100644
--- a/core/server/src/configs/server.rs
+++ b/core/server/src/configs/server.rs
@@ -31,7 +31,6 @@ use iggy_common::Validatable;
 use serde::{Deserialize, Serialize};
 use serde_with::DisplayFromStr;
 use serde_with::serde_as;
-use std::rc::Rc;
 use std::str::FromStr;
 use std::sync::Arc;
 
@@ -41,7 +40,7 @@ pub struct ServerConfig {
     pub message_saver: MessageSaverConfig,
     pub personal_access_token: PersonalAccessTokenConfig,
     pub heartbeat: HeartbeatConfig,
-    pub system: Rc<SystemConfig>,
+    pub system: Arc<SystemConfig>,
     pub quic: QuicConfig,
     pub tcp: TcpConfig,
     pub http: HttpConfig,
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 4a5dbe53..f618d3e0 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -18,8 +18,8 @@
 
 #[cfg(not(feature = "disable-mimalloc"))]
 use mimalloc::MiMalloc;
-use nix::libc::iovec;
 use nix::libc::c_void;
+use nix::libc::iovec;
 
 #[cfg(not(feature = "disable-mimalloc"))]
 #[global_allocator]
@@ -61,4 +61,4 @@ pub fn to_iovec<T>(data: &[T]) -> iovec {
         iov_base: data.as_ptr() as *mut c_void,
         iov_len: data.len() * std::mem::size_of::<T>(),
     }
-}
\ No newline at end of file
+}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 78b3a1e9..436b3829 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use std::rc::Rc;
 use std::sync::Arc;
 use std::thread::available_parallelism;
 
@@ -32,6 +33,7 @@ use server::bootstrap::{
     create_shard_executor, load_config, resolve_persister,
 };
 use server::configs::config_provider::{self};
+use server::configs::server::ServerConfig;
 #[cfg(not(feature = "tokio-console"))]
 use server::log::logger::Logging;
 #[cfg(feature = "tokio-console")]
@@ -77,54 +79,44 @@ fn main() -> Result<(), ServerError> {
     // TODO: I think we could get rid of config provider, since we support 
only TOML
     // as config provider.
     let config_provider = config_provider::resolve(&args.config_provider)?;
-    let config = std::thread::scope(|scope| {
-        let config = scope
-            .spawn(move || {
-                let mut rt = 
create_default_executor::<monoio::IoUringDriver>();
-                rt.block_on(load_config(&config_provider))
-            })
-            .join()
-            .expect("Failed to load config");
-        config
-    })?;
+    // Load config and create directories.
+    // Remove `local_data` directory if run with `--fresh` flag.
+    let mut rt = create_default_executor::<monoio::IoUringDriver>();
+    let config = rt
+        .block_on(async {
+            let config = load_config(&config_provider)
+                .await
+                .with_error_context(|error| {
+                    format!("{COMPONENT} (error: {error}) - failed to load 
config during bootstrap")
+                })?;
+            if args.fresh {
+                let system_path = config.system.get_system_path();
+                if monoio::fs::metadata(&system_path).await.is_ok() {
+                    println!(
+                        "Removing system path at: {} because `--fresh` flag 
was set",
+                        system_path
+                    );
+                    //TODO: Impl dir walk and remove the files
+                    /*
+                    if let Err(e) = 
tokio::fs::remove_dir_all(&system_path).await {
+                        eprintln!("Failed to remove system path at {}: {}", 
system_path, e);
+                    }
+                    */
+                }
+            }
+
+            // Create directories.
+            create_directories(&config.system).await?;
+            Ok::<ServerConfig, ServerError>(config)
+        })
+        .with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to load config")
+        })?;
 
     // Initialize logging
     let mut logging = Logging::new(config.telemetry.clone());
     logging.early_init();
 
-    // Create directories.
-    // Remove `local_data` directory if run with `--fresh` flag.
-    std::thread::scope(|scope| {
-        scope
-            .spawn(|| {
-                let mut rt = 
create_default_executor::<monoio::IoUringDriver>();
-                rt.block_on(async {
-                    if args.fresh {
-                        let system_path = config.system.get_system_path();
-                        if monoio::fs::metadata(&system_path).await.is_ok() {
-                            println!(
-                                "Removing system path at: {} because `--fresh` 
flag was set",
-                                system_path
-                            );
-                            //TODO: Impl dir walk and remove the files
-                            /*
-                            if let Err(e) = 
tokio::fs::remove_dir_all(&system_path).await {
-                                eprintln!("Failed to remove system path at {}: 
{}", system_path, e);
-                            }
-                            */
-                        }
-                    }
-
-                    // Create directories.
-                    create_directories(&config.system).await?;
-                    Ok::<(), ServerError>(())
-                })
-            })
-            .join()
-            .expect("Failed join thread")
-    })
-    .with_error_context(|err| format!("Failed to init server: {err}"))?;
-
     // TODO: Make this configurable from config as a range
     // for example this instance of Iggy will use cores from 0..4
     let available_cpus = available_parallelism().expect("Failed to get num of 
cores");
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index b5e48525..dbd94b68 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -426,7 +426,7 @@ impl IggyShard {
         self.shards.len() as u32
     }
 
-    pub fn broadcast_event_to_all_shards(&self, client_id: u32, event: 
Rc<ShardEvent>) {
+    pub fn broadcast_event_to_all_shards(&self, client_id: u32, event: 
ShardEvent) {
         self.shards
             .iter()
             .filter_map(|shard| {
@@ -437,8 +437,12 @@ impl IggyShard {
                 }
             })
             .map(|conn| {
-                let message = ShardMessage::Event(event.clone());
+                //TODO: Fixme
+                /*
+                let message = ShardMessage::Event(event);
                 conn.send(ShardFrame::new(client_id, message, None));
+                */
+                ()
             })
             .collect::<Vec<_>>();
     }
diff --git a/core/server/src/shard/system/snapshot/mod.rs 
b/core/server/src/shard/system/snapshot/mod.rs
index b4de2992..eb726509 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -27,7 +27,7 @@ use iggy_common::{IggyDuration, IggyError, Snapshot, 
SnapshotCompression, System
 use monoio::fs::{File, OpenOptions};
 use std::io::Cursor;
 use std::path::PathBuf;
-use std::rc::Rc;
+use std::sync::Arc;
 use std::time::Instant;
 use tempfile::NamedTempFile;
 use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -191,7 +191,7 @@ async fn get_test_snapshot() -> Result<NamedTempFile, 
std::io::Error> {
     write_command_output_to_temp_file(Command::new("echo").arg("test")).await
 }
 
-async fn get_server_logs(config: Rc<SystemConfig>) -> Result<NamedTempFile, 
std::io::Error> {
+async fn get_server_logs(config: Arc<SystemConfig>) -> Result<NamedTempFile, 
std::io::Error> {
     let base_directory = PathBuf::from(config.get_system_path());
     let logs_subdirectory = PathBuf::from(&config.logging.path);
     let logs_path = base_directory.join(logs_subdirectory);
@@ -204,7 +204,7 @@ async fn get_server_logs(config: Rc<SystemConfig>) -> 
Result<NamedTempFile, std:
     write_command_output_to_temp_file(Command::new("sh").args(["-c", 
&list_and_cat])).await
 }
 
-async fn get_server_config(config: Rc<SystemConfig>) -> Result<NamedTempFile, 
std::io::Error> {
+async fn get_server_config(config: Arc<SystemConfig>) -> Result<NamedTempFile, 
std::io::Error> {
     let base_directory = PathBuf::from(config.get_system_path());
     let config_path = 
base_directory.join("runtime").join("current_config.toml");
 
@@ -213,7 +213,7 @@ async fn get_server_config(config: Rc<SystemConfig>) -> 
Result<NamedTempFile, st
 
 async fn get_command_result(
     snapshot_type: &SystemSnapshotType,
-    config: Rc<SystemConfig>,
+    config: Arc<SystemConfig>,
 ) -> Result<NamedTempFile, std::io::Error> {
     match snapshot_type {
         SystemSnapshotType::FilesystemOverview => 
get_filesystem_overview().await,
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index 8d1e66eb..cbf4fdd4 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -22,12 +22,12 @@ use crate::{binary::command::ServerCommand, 
streaming::session::Session};
 #[derive(Debug)]
 pub enum ShardMessage {
     Command(ServerCommand),
-    Event(Rc<ShardEvent>),
+    Event(ShardEvent),
 }
 
 #[derive(Debug)]
 pub enum ShardEvent {
-    NewSession(Rc<Session>),
+    NewSession(),
 }
 
 impl From<ServerCommand> for ShardMessage {
@@ -36,8 +36,8 @@ impl From<ServerCommand> for ShardMessage {
     }
 }
 
-impl From<Rc<ShardEvent>> for ShardMessage {
-    fn from(event: Rc<ShardEvent>) -> Self {
+impl From<ShardEvent> for ShardMessage {
+    fn from(event: ShardEvent) -> Self {
         ShardMessage::Event(event)
     }
 }
diff --git a/core/server/src/streaming/partitions/messages.rs 
b/core/server/src/streaming/partitions/messages.rs
index a2d71c8e..b136aeea 100644
--- a/core/server/src/streaming/partitions/messages.rs
+++ b/core/server/src/streaming/partitions/messages.rs
@@ -711,7 +711,7 @@ mod tests {
         let partition_id = 3;
         let with_segment = true;
         let temp_dir = TempDir::new().unwrap();
-        let config = Rc::new(SystemConfig {
+        let config = Arc::new(SystemConfig {
             path: temp_dir.path().to_path_buf().to_str().unwrap().to_string(),
             message_deduplication: MessageDeduplicationConfig {
                 enabled: deduplication_enabled,
diff --git a/core/server/src/streaming/partitions/partition.rs 
b/core/server/src/streaming/partitions/partition.rs
index 4a00ca85..1b8bacf8 100644
--- a/core/server/src/streaming/partitions/partition.rs
+++ b/core/server/src/streaming/partitions/partition.rs
@@ -59,7 +59,7 @@ pub struct Partition {
     pub(crate) consumer_offsets: DashMap<u32, ConsumerOffset>,
     pub(crate) consumer_group_offsets: DashMap<u32, ConsumerOffset>,
     pub(crate) segments: Vec<Segment>,
-    pub(crate) config: Rc<SystemConfig>,
+    pub(crate) config: Arc<SystemConfig>,
     pub(crate) storage: Rc<SystemStorage>,
 }
 
@@ -89,7 +89,7 @@ impl Partition {
         topic_id: u32,
         partition_id: u32,
         with_segment: bool,
-        config: Rc<SystemConfig>,
+        config: Arc<SystemConfig>,
         storage: Rc<SystemStorage>,
         message_expiry: IggyExpiry,
         messages_count_of_parent_stream: Arc<AtomicU64>,
@@ -217,7 +217,7 @@ mod tests {
     #[tokio::test]
     async fn should_be_created_with_a_single_segment_given_valid_parameters() {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Rc::new(SystemConfig {
+        let config = Arc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
@@ -266,7 +266,7 @@ mod tests {
     #[tokio::test]
     async fn 
should_not_initialize_segments_given_false_with_segment_parameter() {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Rc::new(SystemConfig {
+        let config = Arc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
diff --git a/core/server/src/streaming/segments/segment.rs 
b/core/server/src/streaming/segments/segment.rs
index 252fe7f1..60886df6 100644
--- a/core/server/src/streaming/segments/segment.rs
+++ b/core/server/src/streaming/segments/segment.rs
@@ -61,7 +61,7 @@ pub struct Segment {
     pub(super) index_reader: Option<IndexReader>,
     pub(super) message_expiry: IggyExpiry,
     pub(super) accumulator: MessagesAccumulator,
-    pub(super) config: Rc<SystemConfig>,
+    pub(super) config: Arc<SystemConfig>,
     pub(super) indexes: IggyIndexesMut,
     pub(super) messages_size: Arc<AtomicU64>,
     pub(super) indexes_size: Arc<AtomicU64>,
@@ -74,7 +74,7 @@ impl Segment {
         topic_id: u32,
         partition_id: u32,
         start_offset: u64,
-        config: Rc<SystemConfig>,
+        config: Arc<SystemConfig>,
         message_expiry: IggyExpiry,
         size_of_parent_stream: Arc<AtomicU64>,
         size_of_parent_topic: Arc<AtomicU64>,
@@ -451,7 +451,7 @@ mod tests {
         let topic_id = 2;
         let partition_id = 3;
         let start_offset = 0;
-        let config = Rc::new(SystemConfig::default());
+        let config = Arc::new(SystemConfig::default());
         let path = config.get_segment_path(stream_id, topic_id, partition_id, 
start_offset);
         let messages_file_path = Segment::get_messages_file_path(&path);
         let index_path = Segment::get_index_path(&path);
@@ -500,7 +500,7 @@ mod tests {
         let topic_id = 2;
         let partition_id = 3;
         let start_offset = 0;
-        let config = Rc::new(SystemConfig {
+        let config = Arc::new(SystemConfig {
             segment: SegmentConfig {
                 cache_indexes: CacheIndexesConfig::None,
                 ..Default::default()
diff --git a/core/server/src/streaming/storage.rs 
b/core/server/src/streaming/storage.rs
index eaf96dc3..4b41a2b3 100644
--- a/core/server/src/streaming/storage.rs
+++ b/core/server/src/streaming/storage.rs
@@ -33,7 +33,6 @@ use iggy_common::IggyError;
 use mockall::automock;
 use std::fmt::Debug;
 use std::future::Future;
-use std::rc::Rc;
 use std::sync::Arc;
 
 macro_rules! forward_async_methods {
@@ -137,7 +136,7 @@ pub trait PartitionStorage {
 
 #[derive(Debug)]
 pub struct SystemStorage {
-    pub info: Rc<SystemInfoStorageKind>,
+    pub info: Arc<SystemInfoStorageKind>,
     pub stream: Arc<StreamStorageKind>,
     pub topic: Arc<TopicStorageKind>,
     pub partition: Arc<PartitionStorageKind>,
@@ -145,9 +144,9 @@ pub struct SystemStorage {
 }
 
 impl SystemStorage {
-    pub fn new(config: Rc<SystemConfig>, persister: Arc<PersisterKind>) -> 
Self {
+    pub fn new(config: Arc<SystemConfig>, persister: Arc<PersisterKind>) -> 
Self {
         Self {
-            info: 
Rc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new(
+            info: 
Arc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new(
                 config.get_state_info_path(),
                 persister.clone(),
             ))),
diff --git a/core/server/src/streaming/streams/stream.rs 
b/core/server/src/streaming/streams/stream.rs
index 03681b75..6a858fd5 100644
--- a/core/server/src/streaming/streams/stream.rs
+++ b/core/server/src/streaming/streams/stream.rs
@@ -40,7 +40,7 @@ pub struct Stream {
     pub segments_count: Arc<AtomicU32>,
     pub(crate) topics: AHashMap<u32, Topic>,
     pub(crate) topics_ids: AHashMap<String, u32>,
-    pub(crate) config: Rc<SystemConfig>,
+    pub(crate) config: Arc<SystemConfig>,
     pub(crate) storage: Rc<SystemStorage>,
 }
 
@@ -48,7 +48,7 @@ impl Stream {
     pub fn empty(
         id: u32,
         name: &str,
-        config: Rc<SystemConfig>,
+        config: Arc<SystemConfig>,
         storage: Rc<SystemStorage>,
     ) -> Self {
         Stream::create(id, name, config, storage)
@@ -57,7 +57,7 @@ impl Stream {
     pub fn create(
         id: u32,
         name: &str,
-        config: Rc<SystemConfig>,
+        config: Arc<SystemConfig>,
         storage: Rc<SystemStorage>,
     ) -> Self {
         let path = config.get_stream_path(id);
@@ -106,7 +106,7 @@ mod tests {
     #[test]
     fn should_be_created_given_valid_parameters() {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Rc::new(SystemConfig {
+        let config = Arc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
@@ -117,7 +117,7 @@ mod tests {
         MemoryPool::init_pool(config.clone());
         let id = 1;
         let name = "test";
-        let config = Rc::new(SystemConfig::default());
+        let config = Arc::new(SystemConfig::default());
         let path = config.get_stream_path(id);
         let topics_path = config.get_topics_path(id);
 
diff --git a/core/server/src/streaming/streams/topics.rs 
b/core/server/src/streaming/streams/topics.rs
index ed350bee..1d47cc36 100644
--- a/core/server/src/streaming/streams/topics.rs
+++ b/core/server/src/streaming/streams/topics.rs
@@ -287,7 +287,7 @@ mod tests {
     #[tokio::test]
     async fn should_get_topic_by_id_and_name() {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Rc::new(SystemConfig {
+        let config = Arc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
diff --git a/core/server/src/streaming/topics/consumer_groups.rs 
b/core/server/src/streaming/topics/consumer_groups.rs
index bf249d2e..28a8705c 100644
--- a/core/server/src/streaming/topics/consumer_groups.rs
+++ b/core/server/src/streaming/topics/consumer_groups.rs
@@ -440,7 +440,7 @@ mod tests {
 
     async fn get_topic() -> Topic {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Rc::new(SystemConfig {
+        let config = Arc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
diff --git a/core/server/src/streaming/topics/messages.rs 
b/core/server/src/streaming/topics/messages.rs
index e8c52385..c01687a6 100644
--- a/core/server/src/streaming/topics/messages.rs
+++ b/core/server/src/streaming/topics/messages.rs
@@ -322,7 +322,7 @@ mod tests {
 
     async fn init_topic(partitions_count: u32) -> Topic {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Rc::new(SystemConfig {
+        let config = Arc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
diff --git a/core/server/src/streaming/topics/storage.rs 
b/core/server/src/streaming/topics/storage.rs
index bfb7ead0..db74fc98 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -34,7 +34,7 @@ use std::path::Path;
 use std::sync::Arc;
 use tokio::fs;
 use tokio::fs::create_dir_all;
-use tokio::sync::{Mutex};
+use tokio::sync::Mutex;
 use tracing::{error, info, warn};
 
 #[derive(Debug)]
diff --git a/core/server/src/streaming/topics/topic.rs 
b/core/server/src/streaming/topics/topic.rs
index 7e8ca4ac..2e9e38c9 100644
--- a/core/server/src/streaming/topics/topic.rs
+++ b/core/server/src/streaming/topics/topic.rs
@@ -49,7 +49,7 @@ pub struct Topic {
     pub(crate) messages_count_of_parent_stream: Arc<AtomicU64>,
     pub(crate) messages_count: Arc<AtomicU64>,
     pub(crate) segments_count_of_parent_stream: Arc<AtomicU32>,
-    pub(crate) config: Rc<SystemConfig>,
+    pub(crate) config: Arc<SystemConfig>,
     pub(crate) partitions: AHashMap<u32, IggyRwLock<Partition>>,
     pub(crate) storage: Rc<SystemStorage>,
     pub(crate) consumer_groups: RefCell<AHashMap<u32, ConsumerGroup>>,
@@ -72,7 +72,7 @@ impl Topic {
         size_of_parent_stream: Arc<AtomicU64>,
         messages_count_of_parent_stream: Arc<AtomicU64>,
         segments_count_of_parent_stream: Arc<AtomicU32>,
-        config: Rc<SystemConfig>,
+        config: Arc<SystemConfig>,
         storage: Rc<SystemStorage>,
     ) -> Topic {
         Topic::create(
@@ -100,7 +100,7 @@ impl Topic {
         topic_id: u32,
         name: &str,
         partitions_count: u32,
-        config: Rc<SystemConfig>,
+        config: Arc<SystemConfig>,
         storage: Rc<SystemStorage>,
         size_of_parent_stream: Arc<AtomicU64>,
         messages_count_of_parent_stream: Arc<AtomicU64>,
@@ -290,7 +290,7 @@ mod tests {
     #[tokio::test]
     async fn should_be_created_given_valid_parameters() {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Rc::new(SystemConfig {
+        let config = Arc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
diff --git a/core/server/src/streaming/utils/memory_pool.rs 
b/core/server/src/streaming/utils/memory_pool.rs
index e05b7359..198e8626 100644
--- a/core/server/src/streaming/utils/memory_pool.rs
+++ b/core/server/src/streaming/utils/memory_pool.rs
@@ -154,7 +154,7 @@ impl MemoryPool {
     }
 
     /// Initialize the global pool from the given config.
-    pub fn init_pool(config: Rc<SystemConfig>) {
+    pub fn init_pool(config: Arc<SystemConfig>) {
         let is_enabled = config.memory_pool.enabled;
         let memory_limit = config.memory_pool.size.as_bytes_usize();
         let bucket_capacity = config.memory_pool.bucket_capacity as usize;
@@ -468,7 +468,7 @@ mod tests {
 
     fn initialize_pool_for_tests() {
         TEST_INIT.call_once(|| {
-            let config = Rc::new(SystemConfig {
+            let config = Arc::new(SystemConfig {
                 memory_pool: MemoryPoolConfig {
                     enabled: true,
                     size: IggyByteSize::from_str("4GiB").unwrap(),
diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs
index 22796812..2af9f494 100644
--- a/core/server/src/tcp/sender.rs
+++ b/core/server/src/tcp/sender.rs
@@ -28,10 +28,7 @@ use tracing::debug;
 
 const STATUS_OK: &[u8] = &[0; 4];
 
-pub(crate) async fn read<T, B>(
-    stream: &mut T,
-    buffer: B,
-) -> (Result<usize, IggyError>, B)
+pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> (Result<usize, 
IggyError>, B)
 where
     T: AsyncReadRent + AsyncWriteRent + Unpin,
     B: IoBufMut,
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index ffa01eb1..ac29e2b9 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -54,8 +54,11 @@ pub async fn start(server_name: &'static str, shard: 
Rc<IggyShard>) {
                     //TODO: Those can be shared with other shards.
                     shard.add_active_session(session.clone());
                     // Broadcast session to all shards.
+                    //TODO: Fixme
+                    /*
                     let event = 
Rc::new(ShardEvent::NewSession(session.clone()));
                     shard.broadcast_event_to_all_shards(session.client_id, 
event);
+                    */
 
                     let _client_id = session.client_id;
                     info!("Created new session: {session}");

Reply via email to