This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push: new df3a05ba server compiles df3a05ba is described below commit df3a05ba7b250fccf8c32ef05b53de45400fd343 Author: numinex <g.koszy...@gmail.com> AuthorDate: Fri Jun 27 15:09:01 2025 +0200 server compiles --- core/server/src/binary/command.rs | 1 - .../create_consumer_group_handler.rs | 2 +- .../consumer_groups/get_consumer_group_handler.rs | 3 +- .../leave_consumer_group_handler.rs | 2 +- .../handlers/messages/poll_messages_handler.rs | 2 +- .../binary/handlers/system/get_clients_handler.rs | 8 +-- core/server/src/binary/sender.rs | 3 +- core/server/src/configs/defaults.rs | 3 +- core/server/src/configs/server.rs | 3 +- core/server/src/lib.rs | 4 +- core/server/src/main.rs | 78 ++++++++++------------ core/server/src/shard/mod.rs | 8 ++- core/server/src/shard/system/snapshot/mod.rs | 8 +-- core/server/src/shard/transmission/message.rs | 8 +-- core/server/src/streaming/partitions/messages.rs | 2 +- core/server/src/streaming/partitions/partition.rs | 8 +-- core/server/src/streaming/segments/segment.rs | 8 +-- core/server/src/streaming/storage.rs | 7 +- core/server/src/streaming/streams/stream.rs | 10 +-- core/server/src/streaming/streams/topics.rs | 2 +- .../server/src/streaming/topics/consumer_groups.rs | 2 +- core/server/src/streaming/topics/messages.rs | 2 +- core/server/src/streaming/topics/storage.rs | 2 +- core/server/src/streaming/topics/topic.rs | 8 +-- core/server/src/streaming/utils/memory_pool.rs | 4 +- core/server/src/tcp/sender.rs | 5 +- core/server/src/tcp/tcp_listener.rs | 3 + 27 files changed, 93 insertions(+), 103 deletions(-) diff --git a/core/server/src/binary/command.rs b/core/server/src/binary/command.rs index 1ebac395..aa826590 100644 --- a/core/server/src/binary/command.rs +++ b/core/server/src/binary/command.rs @@ -68,7 +68,6 @@ use iggy_common::update_stream::UpdateStream; use iggy_common::update_topic::UpdateTopic; use iggy_common::update_user::UpdateUser; use iggy_common::*; -use rustls::crypto::hash::Output; use strum::EnumString; use tracing::error; diff --git a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index 266b46a6..e6e1f239 100644 --- a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@ -60,7 +60,7 @@ impl ServerCommandHandler for CreateConsumerGroup { self.stream_id, self.topic_id, self.group_id ) })?; - + let stream = shard.find_stream(session, &self.stream_id) .with_error_context(|error| { format!( diff --git a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs index 8f67192b..dab89c59 100644 --- a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs @@ -46,7 +46,8 @@ impl ServerCommandHandler for GetConsumerGroup { debug!("session: {session}, command: {self}"); let stream_id = &self.stream_id; - let stream = shard.find_stream(session, &self.stream_id) + let stream = shard + .find_stream(session, &self.stream_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get stream for stream_id: {stream_id}" diff --git a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs index ba352bf9..18ee8b8c 100644 --- a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs @@ -16,6 +16,7 @@ * under the License. */ +use super::COMPONENT; use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; @@ -27,7 +28,6 @@ use iggy_common::IggyError; use iggy_common::leave_consumer_group::LeaveConsumerGroup; use std::rc::Rc; use tracing::{debug, instrument}; -use super::COMPONENT; impl ServerCommandHandler for LeaveConsumerGroup { fn code(&self) -> u32 { diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index a591f691..9d6d526b 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -20,8 +20,8 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::messages::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; -use crate::shard::system::messages::PollingArgs; use crate::shard::IggyShard; +use crate::shard::system::messages::PollingArgs; use crate::streaming::session::Session; use crate::to_iovec; use anyhow::Result; diff --git a/core/server/src/binary/handlers/system/get_clients_handler.rs b/core/server/src/binary/handlers/system/get_clients_handler.rs index c73ac987..9d5241f4 100644 --- a/core/server/src/binary/handlers/system/get_clients_handler.rs +++ b/core/server/src/binary/handlers/system/get_clients_handler.rs @@ -43,11 +43,9 @@ impl ServerCommandHandler for GetClients { ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let clients = shard - .get_clients(session) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get clients, session: {session}") - })?; + let clients = shard.get_clients(session).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get clients, session: {session}") + })?; let clients = mapper::map_clients(clients).await; sender.send_ok_response(&clients).await?; Ok(()) diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs index ff405eb6..adbf05ee 100644 --- a/core/server/src/binary/sender.rs +++ b/core/server/src/binary/sender.rs @@ -54,11 +54,10 @@ macro_rules! forward_async_methods { } } - pub trait Sender { fn read<B: IoBufMut>( &mut self, - buffer: B + buffer: B, ) -> impl Future<Output = (Result<usize, IggyError>, B)>; fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), IggyError>>; fn send_ok_response(&mut self, payload: &[u8]) -> impl Future<Output = Result<(), IggyError>>; diff --git a/core/server/src/configs/defaults.rs b/core/server/src/configs/defaults.rs index fc333ec7..82b560cf 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@ -36,7 +36,6 @@ use crate::configs::system::{ use crate::configs::tcp::{TcpConfig, TcpTlsConfig}; use iggy_common::IggyByteSize; use iggy_common::IggyDuration; -use std::rc::Rc; use std::sync::Arc; use std::time::Duration; @@ -52,7 +51,7 @@ impl Default for ServerConfig { heartbeat: HeartbeatConfig::default(), message_saver: MessageSaverConfig::default(), personal_access_token: PersonalAccessTokenConfig::default(), - system: Rc::new(SystemConfig::default()), + system: Arc::new(SystemConfig::default()), quic: QuicConfig::default(), tcp: TcpConfig::default(), http: HttpConfig::default(), diff --git a/core/server/src/configs/server.rs b/core/server/src/configs/server.rs index 059fd1c4..252df9a7 100644 --- a/core/server/src/configs/server.rs +++ b/core/server/src/configs/server.rs @@ -31,7 +31,6 @@ use iggy_common::Validatable; use serde::{Deserialize, Serialize}; use serde_with::DisplayFromStr; use serde_with::serde_as; -use std::rc::Rc; use std::str::FromStr; use std::sync::Arc; @@ -41,7 +40,7 @@ pub struct ServerConfig { pub message_saver: MessageSaverConfig, pub personal_access_token: PersonalAccessTokenConfig, pub heartbeat: HeartbeatConfig, - pub system: Rc<SystemConfig>, + pub system: Arc<SystemConfig>, pub quic: QuicConfig, pub tcp: TcpConfig, pub http: HttpConfig, diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index 4a5dbe53..f618d3e0 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -18,8 +18,8 @@ #[cfg(not(feature = "disable-mimalloc"))] use mimalloc::MiMalloc; -use nix::libc::iovec; use nix::libc::c_void; +use nix::libc::iovec; #[cfg(not(feature = "disable-mimalloc"))] #[global_allocator] @@ -61,4 +61,4 @@ pub fn to_iovec<T>(data: &[T]) -> iovec { iov_base: data.as_ptr() as *mut c_void, iov_len: data.len() * std::mem::size_of::<T>(), } -} \ No newline at end of file +} diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 78b3a1e9..436b3829 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -16,6 +16,7 @@ * under the License. */ +use std::rc::Rc; use std::sync::Arc; use std::thread::available_parallelism; @@ -32,6 +33,7 @@ use server::bootstrap::{ create_shard_executor, load_config, resolve_persister, }; use server::configs::config_provider::{self}; +use server::configs::server::ServerConfig; #[cfg(not(feature = "tokio-console"))] use server::log::logger::Logging; #[cfg(feature = "tokio-console")] @@ -77,54 +79,44 @@ fn main() -> Result<(), ServerError> { // TODO: I think we could get rid of config provider, since we support only TOML // as config provider. let config_provider = config_provider::resolve(&args.config_provider)?; - let config = std::thread::scope(|scope| { - let config = scope - .spawn(move || { - let mut rt = create_default_executor::<monoio::IoUringDriver>(); - rt.block_on(load_config(&config_provider)) - }) - .join() - .expect("Failed to load config"); - config - })?; + // Load config and create directories. + // Remove `local_data` directory if run with `--fresh` flag. + let mut rt = create_default_executor::<monoio::IoUringDriver>(); + let config = rt + .block_on(async { + let config = load_config(&config_provider) + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to load config during bootstrap") + })?; + if args.fresh { + let system_path = config.system.get_system_path(); + if monoio::fs::metadata(&system_path).await.is_ok() { + println!( + "Removing system path at: {} because `--fresh` flag was set", + system_path + ); + //TODO: Impl dir walk and remove the files + /* + if let Err(e) = tokio::fs::remove_dir_all(&system_path).await { + eprintln!("Failed to remove system path at {}: {}", system_path, e); + } + */ + } + } + + // Create directories. + create_directories(&config.system).await?; + Ok::<ServerConfig, ServerError>(config) + }) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to load config") + })?; // Initialize logging let mut logging = Logging::new(config.telemetry.clone()); logging.early_init(); - // Create directories. - // Remove `local_data` directory if run with `--fresh` flag. - std::thread::scope(|scope| { - scope - .spawn(|| { - let mut rt = create_default_executor::<monoio::IoUringDriver>(); - rt.block_on(async { - if args.fresh { - let system_path = config.system.get_system_path(); - if monoio::fs::metadata(&system_path).await.is_ok() { - println!( - "Removing system path at: {} because `--fresh` flag was set", - system_path - ); - //TODO: Impl dir walk and remove the files - /* - if let Err(e) = tokio::fs::remove_dir_all(&system_path).await { - eprintln!("Failed to remove system path at {}: {}", system_path, e); - } - */ - } - } - - // Create directories. - create_directories(&config.system).await?; - Ok::<(), ServerError>(()) - }) - }) - .join() - .expect("Failed join thread") - }) - .with_error_context(|err| format!("Failed to init server: {err}"))?; - // TODO: Make this configurable from config as a range // for example this instance of Iggy will use cores from 0..4 let available_cpus = available_parallelism().expect("Failed to get num of cores"); diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index b5e48525..dbd94b68 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -426,7 +426,7 @@ impl IggyShard { self.shards.len() as u32 } - pub fn broadcast_event_to_all_shards(&self, client_id: u32, event: Rc<ShardEvent>) { + pub fn broadcast_event_to_all_shards(&self, client_id: u32, event: ShardEvent) { self.shards .iter() .filter_map(|shard| { @@ -437,8 +437,12 @@ impl IggyShard { } }) .map(|conn| { - let message = ShardMessage::Event(event.clone()); + //TODO: Fixme + /* + let message = ShardMessage::Event(event); conn.send(ShardFrame::new(client_id, message, None)); + */ + () }) .collect::<Vec<_>>(); } diff --git a/core/server/src/shard/system/snapshot/mod.rs b/core/server/src/shard/system/snapshot/mod.rs index b4de2992..eb726509 100644 --- a/core/server/src/shard/system/snapshot/mod.rs +++ b/core/server/src/shard/system/snapshot/mod.rs @@ -27,7 +27,7 @@ use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, System use monoio::fs::{File, OpenOptions}; use std::io::Cursor; use std::path::PathBuf; -use std::rc::Rc; +use std::sync::Arc; use std::time::Instant; use tempfile::NamedTempFile; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -191,7 +191,7 @@ async fn get_test_snapshot() -> Result<NamedTempFile, std::io::Error> { write_command_output_to_temp_file(Command::new("echo").arg("test")).await } -async fn get_server_logs(config: Rc<SystemConfig>) -> Result<NamedTempFile, std::io::Error> { +async fn get_server_logs(config: Arc<SystemConfig>) -> Result<NamedTempFile, std::io::Error> { let base_directory = PathBuf::from(config.get_system_path()); let logs_subdirectory = PathBuf::from(&config.logging.path); let logs_path = base_directory.join(logs_subdirectory); @@ -204,7 +204,7 @@ async fn get_server_logs(config: Rc<SystemConfig>) -> Result<NamedTempFile, std: write_command_output_to_temp_file(Command::new("sh").args(["-c", &list_and_cat])).await } -async fn get_server_config(config: Rc<SystemConfig>) -> Result<NamedTempFile, std::io::Error> { +async fn get_server_config(config: Arc<SystemConfig>) -> Result<NamedTempFile, std::io::Error> { let base_directory = PathBuf::from(config.get_system_path()); let config_path = base_directory.join("runtime").join("current_config.toml"); @@ -213,7 +213,7 @@ async fn get_server_config(config: Rc<SystemConfig>) -> Result<NamedTempFile, st async fn get_command_result( snapshot_type: &SystemSnapshotType, - config: Rc<SystemConfig>, + config: Arc<SystemConfig>, ) -> Result<NamedTempFile, std::io::Error> { match snapshot_type { SystemSnapshotType::FilesystemOverview => get_filesystem_overview().await, diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index 8d1e66eb..cbf4fdd4 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -22,12 +22,12 @@ use crate::{binary::command::ServerCommand, streaming::session::Session}; #[derive(Debug)] pub enum ShardMessage { Command(ServerCommand), - Event(Rc<ShardEvent>), + Event(ShardEvent), } #[derive(Debug)] pub enum ShardEvent { - NewSession(Rc<Session>), + NewSession(), } impl From<ServerCommand> for ShardMessage { @@ -36,8 +36,8 @@ impl From<ServerCommand> for ShardMessage { } } -impl From<Rc<ShardEvent>> for ShardMessage { - fn from(event: Rc<ShardEvent>) -> Self { +impl From<ShardEvent> for ShardMessage { + fn from(event: ShardEvent) -> Self { ShardMessage::Event(event) } } diff --git a/core/server/src/streaming/partitions/messages.rs b/core/server/src/streaming/partitions/messages.rs index a2d71c8e..b136aeea 100644 --- a/core/server/src/streaming/partitions/messages.rs +++ b/core/server/src/streaming/partitions/messages.rs @@ -711,7 +711,7 @@ mod tests { let partition_id = 3; let with_segment = true; let temp_dir = TempDir::new().unwrap(); - let config = Rc::new(SystemConfig { + let config = Arc::new(SystemConfig { path: temp_dir.path().to_path_buf().to_str().unwrap().to_string(), message_deduplication: MessageDeduplicationConfig { enabled: deduplication_enabled, diff --git a/core/server/src/streaming/partitions/partition.rs b/core/server/src/streaming/partitions/partition.rs index 4a00ca85..1b8bacf8 100644 --- a/core/server/src/streaming/partitions/partition.rs +++ b/core/server/src/streaming/partitions/partition.rs @@ -59,7 +59,7 @@ pub struct Partition { pub(crate) consumer_offsets: DashMap<u32, ConsumerOffset>, pub(crate) consumer_group_offsets: DashMap<u32, ConsumerOffset>, pub(crate) segments: Vec<Segment>, - pub(crate) config: Rc<SystemConfig>, + pub(crate) config: Arc<SystemConfig>, pub(crate) storage: Rc<SystemStorage>, } @@ -89,7 +89,7 @@ impl Partition { topic_id: u32, partition_id: u32, with_segment: bool, - config: Rc<SystemConfig>, + config: Arc<SystemConfig>, storage: Rc<SystemStorage>, message_expiry: IggyExpiry, messages_count_of_parent_stream: Arc<AtomicU64>, @@ -217,7 +217,7 @@ mod tests { #[tokio::test] async fn should_be_created_with_a_single_segment_given_valid_parameters() { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Rc::new(SystemConfig { + let config = Arc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); @@ -266,7 +266,7 @@ mod tests { #[tokio::test] async fn should_not_initialize_segments_given_false_with_segment_parameter() { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Rc::new(SystemConfig { + let config = Arc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); diff --git a/core/server/src/streaming/segments/segment.rs b/core/server/src/streaming/segments/segment.rs index 252fe7f1..60886df6 100644 --- a/core/server/src/streaming/segments/segment.rs +++ b/core/server/src/streaming/segments/segment.rs @@ -61,7 +61,7 @@ pub struct Segment { pub(super) index_reader: Option<IndexReader>, pub(super) message_expiry: IggyExpiry, pub(super) accumulator: MessagesAccumulator, - pub(super) config: Rc<SystemConfig>, + pub(super) config: Arc<SystemConfig>, pub(super) indexes: IggyIndexesMut, pub(super) messages_size: Arc<AtomicU64>, pub(super) indexes_size: Arc<AtomicU64>, @@ -74,7 +74,7 @@ impl Segment { topic_id: u32, partition_id: u32, start_offset: u64, - config: Rc<SystemConfig>, + config: Arc<SystemConfig>, message_expiry: IggyExpiry, size_of_parent_stream: Arc<AtomicU64>, size_of_parent_topic: Arc<AtomicU64>, @@ -451,7 +451,7 @@ mod tests { let topic_id = 2; let partition_id = 3; let start_offset = 0; - let config = Rc::new(SystemConfig::default()); + let config = Arc::new(SystemConfig::default()); let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset); let messages_file_path = Segment::get_messages_file_path(&path); let index_path = Segment::get_index_path(&path); @@ -500,7 +500,7 @@ mod tests { let topic_id = 2; let partition_id = 3; let start_offset = 0; - let config = Rc::new(SystemConfig { + let config = Arc::new(SystemConfig { segment: SegmentConfig { cache_indexes: CacheIndexesConfig::None, ..Default::default() diff --git a/core/server/src/streaming/storage.rs b/core/server/src/streaming/storage.rs index eaf96dc3..4b41a2b3 100644 --- a/core/server/src/streaming/storage.rs +++ b/core/server/src/streaming/storage.rs @@ -33,7 +33,6 @@ use iggy_common::IggyError; use mockall::automock; use std::fmt::Debug; use std::future::Future; -use std::rc::Rc; use std::sync::Arc; macro_rules! forward_async_methods { @@ -137,7 +136,7 @@ pub trait PartitionStorage { #[derive(Debug)] pub struct SystemStorage { - pub info: Rc<SystemInfoStorageKind>, + pub info: Arc<SystemInfoStorageKind>, pub stream: Arc<StreamStorageKind>, pub topic: Arc<TopicStorageKind>, pub partition: Arc<PartitionStorageKind>, @@ -145,9 +144,9 @@ pub struct SystemStorage { } impl SystemStorage { - pub fn new(config: Rc<SystemConfig>, persister: Arc<PersisterKind>) -> Self { + pub fn new(config: Arc<SystemConfig>, persister: Arc<PersisterKind>) -> Self { Self { - info: Rc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new( + info: Arc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new( config.get_state_info_path(), persister.clone(), ))), diff --git a/core/server/src/streaming/streams/stream.rs b/core/server/src/streaming/streams/stream.rs index 03681b75..6a858fd5 100644 --- a/core/server/src/streaming/streams/stream.rs +++ b/core/server/src/streaming/streams/stream.rs @@ -40,7 +40,7 @@ pub struct Stream { pub segments_count: Arc<AtomicU32>, pub(crate) topics: AHashMap<u32, Topic>, pub(crate) topics_ids: AHashMap<String, u32>, - pub(crate) config: Rc<SystemConfig>, + pub(crate) config: Arc<SystemConfig>, pub(crate) storage: Rc<SystemStorage>, } @@ -48,7 +48,7 @@ impl Stream { pub fn empty( id: u32, name: &str, - config: Rc<SystemConfig>, + config: Arc<SystemConfig>, storage: Rc<SystemStorage>, ) -> Self { Stream::create(id, name, config, storage) @@ -57,7 +57,7 @@ impl Stream { pub fn create( id: u32, name: &str, - config: Rc<SystemConfig>, + config: Arc<SystemConfig>, storage: Rc<SystemStorage>, ) -> Self { let path = config.get_stream_path(id); @@ -106,7 +106,7 @@ mod tests { #[test] fn should_be_created_given_valid_parameters() { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Rc::new(SystemConfig { + let config = Arc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); @@ -117,7 +117,7 @@ mod tests { MemoryPool::init_pool(config.clone()); let id = 1; let name = "test"; - let config = Rc::new(SystemConfig::default()); + let config = Arc::new(SystemConfig::default()); let path = config.get_stream_path(id); let topics_path = config.get_topics_path(id); diff --git a/core/server/src/streaming/streams/topics.rs b/core/server/src/streaming/streams/topics.rs index ed350bee..1d47cc36 100644 --- a/core/server/src/streaming/streams/topics.rs +++ b/core/server/src/streaming/streams/topics.rs @@ -287,7 +287,7 @@ mod tests { #[tokio::test] async fn should_get_topic_by_id_and_name() { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Rc::new(SystemConfig { + let config = Arc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); diff --git a/core/server/src/streaming/topics/consumer_groups.rs b/core/server/src/streaming/topics/consumer_groups.rs index bf249d2e..28a8705c 100644 --- a/core/server/src/streaming/topics/consumer_groups.rs +++ b/core/server/src/streaming/topics/consumer_groups.rs @@ -440,7 +440,7 @@ mod tests { async fn get_topic() -> Topic { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Rc::new(SystemConfig { + let config = Arc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); diff --git a/core/server/src/streaming/topics/messages.rs b/core/server/src/streaming/topics/messages.rs index e8c52385..c01687a6 100644 --- a/core/server/src/streaming/topics/messages.rs +++ b/core/server/src/streaming/topics/messages.rs @@ -322,7 +322,7 @@ mod tests { async fn init_topic(partitions_count: u32) -> Topic { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Rc::new(SystemConfig { + let config = Arc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); diff --git a/core/server/src/streaming/topics/storage.rs b/core/server/src/streaming/topics/storage.rs index bfb7ead0..db74fc98 100644 --- a/core/server/src/streaming/topics/storage.rs +++ b/core/server/src/streaming/topics/storage.rs @@ -34,7 +34,7 @@ use std::path::Path; use std::sync::Arc; use tokio::fs; use tokio::fs::create_dir_all; -use tokio::sync::{Mutex}; +use tokio::sync::Mutex; use tracing::{error, info, warn}; #[derive(Debug)] diff --git a/core/server/src/streaming/topics/topic.rs b/core/server/src/streaming/topics/topic.rs index 7e8ca4ac..2e9e38c9 100644 --- a/core/server/src/streaming/topics/topic.rs +++ b/core/server/src/streaming/topics/topic.rs @@ -49,7 +49,7 @@ pub struct Topic { pub(crate) messages_count_of_parent_stream: Arc<AtomicU64>, pub(crate) messages_count: Arc<AtomicU64>, pub(crate) segments_count_of_parent_stream: Arc<AtomicU32>, - pub(crate) config: Rc<SystemConfig>, + pub(crate) config: Arc<SystemConfig>, pub(crate) partitions: AHashMap<u32, IggyRwLock<Partition>>, pub(crate) storage: Rc<SystemStorage>, pub(crate) consumer_groups: RefCell<AHashMap<u32, ConsumerGroup>>, @@ -72,7 +72,7 @@ impl Topic { size_of_parent_stream: Arc<AtomicU64>, messages_count_of_parent_stream: Arc<AtomicU64>, segments_count_of_parent_stream: Arc<AtomicU32>, - config: Rc<SystemConfig>, + config: Arc<SystemConfig>, storage: Rc<SystemStorage>, ) -> Topic { Topic::create( @@ -100,7 +100,7 @@ impl Topic { topic_id: u32, name: &str, partitions_count: u32, - config: Rc<SystemConfig>, + config: Arc<SystemConfig>, storage: Rc<SystemStorage>, size_of_parent_stream: Arc<AtomicU64>, messages_count_of_parent_stream: Arc<AtomicU64>, @@ -290,7 +290,7 @@ mod tests { #[tokio::test] async fn should_be_created_given_valid_parameters() { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Rc::new(SystemConfig { + let config = Arc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); diff --git a/core/server/src/streaming/utils/memory_pool.rs b/core/server/src/streaming/utils/memory_pool.rs index e05b7359..198e8626 100644 --- a/core/server/src/streaming/utils/memory_pool.rs +++ b/core/server/src/streaming/utils/memory_pool.rs @@ -154,7 +154,7 @@ impl MemoryPool { } /// Initialize the global pool from the given config. - pub fn init_pool(config: Rc<SystemConfig>) { + pub fn init_pool(config: Arc<SystemConfig>) { let is_enabled = config.memory_pool.enabled; let memory_limit = config.memory_pool.size.as_bytes_usize(); let bucket_capacity = config.memory_pool.bucket_capacity as usize; @@ -468,7 +468,7 @@ mod tests { fn initialize_pool_for_tests() { TEST_INIT.call_once(|| { - let config = Rc::new(SystemConfig { + let config = Arc::new(SystemConfig { memory_pool: MemoryPoolConfig { enabled: true, size: IggyByteSize::from_str("4GiB").unwrap(), diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs index 22796812..2af9f494 100644 --- a/core/server/src/tcp/sender.rs +++ b/core/server/src/tcp/sender.rs @@ -28,10 +28,7 @@ use tracing::debug; const STATUS_OK: &[u8] = &[0; 4]; -pub(crate) async fn read<T, B>( - stream: &mut T, - buffer: B, -) -> (Result<usize, IggyError>, B) +pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> (Result<usize, IggyError>, B) where T: AsyncReadRent + AsyncWriteRent + Unpin, B: IoBufMut, diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index ffa01eb1..ac29e2b9 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -54,8 +54,11 @@ pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) { //TODO: Those can be shared with other shards. shard.add_active_session(session.clone()); // Broadcast session to all shards. + //TODO: Fixme + /* let event = Rc::new(ShardEvent::NewSession(session.clone())); shard.broadcast_event_to_all_shards(session.client_id, event); + */ let _client_id = session.client_id; info!("Created new session: {session}");