This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 3e95ef743 feat(io_uring): remove MockPersister and MockState (#2262)
3e95ef743 is described below

commit 3e95ef743166d4766ce3ebdb22b72468ff74b5dc
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Oct 14 11:16:38 2025 +0200

    feat(io_uring): remove MockPersister and MockState (#2262)
---
 Cargo.lock                                         |  1 -
 core/integration/tests/server/mod.rs               | 10 ++--
 ...h_multiple_clients_polling_messages_scenario.rs |  2 +-
 core/integration/tests/state/file.rs               |  1 -
 core/integration/tests/state/system.rs             |  1 -
 core/server/Cargo.toml                             |  1 -
 core/server/src/log/runtime.rs                     |  8 +---
 core/server/src/main.rs                            | 14 +++---
 core/server/src/shard/builder.rs                   |  6 +--
 core/server/src/shard/mod.rs                       | 13 ++---
 core/server/src/shard/system/messages.rs           |  4 +-
 core/server/src/shard/system/snapshot/mod.rs       |  2 +-
 core/server/src/shard/system/streams.rs            | 14 ++++--
 core/server/src/shard/system/utils.rs              | 18 +++----
 core/server/src/slab/streams.rs                    | 21 ++++++--
 core/server/src/slab/traits_ext.rs                 |  4 --
 core/server/src/state/file.rs                      | 10 ++--
 core/server/src/state/mod.rs                       | 56 +---------------------
 core/server/src/state/system.rs                    |  5 +-
 core/server/src/streaming/persistence/persister.rs | 46 +++++-------------
 20 files changed, 82 insertions(+), 155 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 7c5d12e8a..f92a8002a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7509,7 +7509,6 @@ dependencies = [
  "jsonwebtoken",
  "lending-iterator",
  "mimalloc",
- "mockall",
  "moka",
  "nix",
  "once_cell",
diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/mod.rs
index 15d559d8a..bc0620725 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -22,7 +22,6 @@ mod general;
 mod scenarios;
 mod specific;
 
-use compio::rustls::pki_types::IpAddr;
 use iggy_common::TransportProtocol;
 use integration::{
     http_client::HttpClientFactory,
@@ -37,8 +36,8 @@ use scenarios::{
     consumer_group_with_single_client_polling_messages_scenario, 
create_message_payload,
     message_headers_scenario, stream_size_validation_scenario, 
system_scenario, user_scenario,
 };
-use std::{collections::HashMap, future::Future};
 use std::pin::Pin;
+use std::{collections::HashMap, future::Future};
 
 type ScenarioFn = fn(&dyn ClientFactory) -> Pin<Box<dyn Future<Output = ()> + 
'_>>;
 
@@ -82,10 +81,11 @@ async fn run_scenario(transport: TransportProtocol, 
scenario: ScenarioFn) {
     // TODO: Need to enable `TCP_NODELAY` flag for TCP transports, due to 
small messages being used in the test.
     // For some reason TCP in compio can't deal with it, but in tokio it works 
fine.
     let mut extra_envs = HashMap::new();
-    extra_envs.insert("IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(), 
"true".to_string());
     extra_envs.insert(
-        "IGGY_TCP_SOCKET_NODELAY".to_string(),
-    "true".to_string());
+        "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+        "true".to_string(),
+    );
+    extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), 
"true".to_string());
     let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
     test_server.start();
 
diff --git 
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
 
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
index ec52814c7..44746bf32 100644
--- 
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
+++ 
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
@@ -24,7 +24,7 @@ use iggy::prelude::*;
 use integration::test_server::{
     ClientFactory, assert_clean_system, create_user, login_root, login_user,
 };
-use std::str::{FromStr, from_utf8};
+use std::str::FromStr;
 
 pub async fn run(client_factory: &dyn ClientFactory) {
     let system_client = create_client(client_factory).await;
diff --git a/core/integration/tests/state/file.rs 
b/core/integration/tests/state/file.rs
index 2b718d81f..9b1a9d181 100644
--- a/core/integration/tests/state/file.rs
+++ b/core/integration/tests/state/file.rs
@@ -21,7 +21,6 @@ use bytes::Bytes;
 use iggy::prelude::BytesSerializable;
 use iggy_common::create_stream::CreateStream;
 use iggy_common::create_user::CreateUser;
-use server::state::State;
 use server::state::command::EntryCommand;
 use server::state::entry::StateEntry;
 use server::state::models::{CreateStreamWithId, CreateUserWithId};
diff --git a/core/integration/tests/state/system.rs 
b/core/integration/tests/state/system.rs
index 77eeb5851..60ee5c700 100644
--- a/core/integration/tests/state/system.rs
+++ b/core/integration/tests/state/system.rs
@@ -25,7 +25,6 @@ use iggy_common::create_stream::CreateStream;
 use iggy_common::create_topic::CreateTopic;
 use iggy_common::create_user::CreateUser;
 use iggy_common::delete_stream::DeleteStream;
-use server::state::State;
 use server::state::command::EntryCommand;
 use server::state::models::{
     CreateConsumerGroupWithId, CreatePersonalAccessTokenWithHash, 
CreateStreamWithId,
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 6301615e6..79f8ed8be 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -141,5 +141,4 @@ vergen-git2 = { version = "1.0.7", features = [
 ] }
 
 [dev-dependencies]
-mockall = { workspace = true }
 serial_test = { workspace = true }
diff --git a/core/server/src/log/runtime.rs b/core/server/src/log/runtime.rs
index cd62736f4..2ee5318b3 100644
--- a/core/server/src/log/runtime.rs
+++ b/core/server/src/log/runtime.rs
@@ -10,13 +10,9 @@ impl Runtime for CompioRuntime {
     where
         F: Future<Output = ()> + Send + 'static,
     {
-        // It's fine to detach this task, the documentation for `spawn` method 
on `Runtime` trait says:
-        //
-        //
-        // "This is mainly used to run batch span processing in the 
background. Note, that the function
+        // This is mainly used to run batch span processing in the background. 
Note, that the function
         // does not return a handle. OpenTelemetry will use a different way to 
wait for the future to
-        // finish when the caller shuts down.""
-        // TODO(hubcio): investigate if we can use TaskRegistry API for task 
spawn
+        // finish when the caller shuts down.
         compio::runtime::spawn(future).detach();
     }
 
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index a4ecdcc75..910d9a518 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -39,7 +39,6 @@ use server::shard::{IggyShard, ShardInfo, 
calculate_shard_assignment};
 use server::slab::traits_ext::{
     EntityComponentSystem, EntityComponentSystemMutCell, IntoComponents,
 };
-use server::state::StateKind;
 use server::state::file::FileState;
 use server::state::system::SystemState;
 use server::streaming::diagnostics::metrics::Metrics;
@@ -81,12 +80,11 @@ async fn main() -> Result<(), ServerError> {
         );
     }
     let args = Args::parse();
+
     // FIRST DISCRETE LOADING STEP.
-    // TODO: I think we could get rid of config provider, since we support 
only TOML
-    // as config provider.
-    let config_provider = config_provider::resolve(&args.config_provider)?;
     // Load config and create directories.
     // Remove `local_data` directory if run with `--fresh` flag.
+    let config_provider = config_provider::resolve(&args.config_provider)?;
     let config = load_config(&config_provider)
         .await
         .with_error_context(|error| {
@@ -197,12 +195,12 @@ async fn main() -> Result<(), ServerError> {
 
     // TENTH DISCRETE LOADING STEP.
     let state_persister = resolve_persister(config.system.state.enforce_fsync);
-    let state = StateKind::File(FileState::new(
+    let state = FileState::new(
         &config.system.get_state_messages_file_path(),
         &current_version,
         state_persister,
         encryptor.clone(),
-    ));
+    );
     let state = SystemState::load(state).await?;
     let (streams_state, users_state) = state.decompose();
     let streams = load_streams(streams_state.into_values(), 
&config.system).await?;
@@ -284,12 +282,12 @@ async fn main() -> Result<(), ServerError> {
         let encryptor = encryptor.clone();
         let metrics = metrics.clone();
         let state_persister = 
resolve_persister(config.system.state.enforce_fsync);
-        let state = StateKind::File(FileState::new(
+        let state = FileState::new(
             &config.system.get_state_messages_file_path(),
             &current_version,
             state_persister,
             encryptor.clone(),
-        ));
+        );
 
         // TODO: Explore decoupling the `Log` from `Partition` entity.
         // Ergh... I knew this will backfire to include `Log` as part of the 
`Partition` entity,
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 86abdeab1..dce9b268b 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -24,7 +24,7 @@ use crate::{
     configs::server::ServerConfig,
     shard::{Shard, ShardInfo, namespace::IggyNamespace},
     slab::{streams::Streams, users::Users},
-    state::StateKind,
+    state::file::FileState,
     streaming::{diagnostics::metrics::Metrics, utils::ptr::EternalPtr},
     versioning::SemanticVersion,
 };
@@ -37,7 +37,7 @@ pub struct IggyShardBuilder {
     id: Option<u16>,
     streams: Option<Streams>,
     shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardInfo>>>,
-    state: Option<StateKind>,
+    state: Option<FileState>,
     users: Option<Users>,
     connections: Option<Vec<ShardConnector<ShardFrame>>>,
     config: Option<ServerConfig>,
@@ -85,7 +85,7 @@ impl IggyShardBuilder {
         self
     }
 
-    pub fn state(mut self, state: StateKind) -> Self {
+    pub fn state(mut self, state: FileState) -> Self {
         self.state = Some(state);
         self
     }
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 94f61df33..0996e1fc1 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -38,11 +38,10 @@ use crate::{
     },
     shard_error, shard_info,
     slab::{streams::Streams, traits_ext::EntityMarker, users::Users},
-    state::StateKind,
+    state::file::FileState,
     streaming::{
-        clients::client_manager::ClientManager, diagnostics::metrics::Metrics,
-        session::Session, traits::MainOps,
-        users::permissioner::Permissioner, utils::ptr::EternalPtr,
+        clients::client_manager::ClientManager, diagnostics::metrics::Metrics, 
session::Session,
+        traits::MainOps, users::permissioner::Permissioner, 
utils::ptr::EternalPtr,
     },
     versioning::SemanticVersion,
 };
@@ -51,9 +50,7 @@ use compio::io::AsyncWriteAtExt;
 use dashmap::DashMap;
 use error_set::ErrContext;
 use hash32::{Hasher, Murmur3Hasher};
-use iggy_common::{
-    EncryptorKind, Identifier, IggyError, TransportProtocol,
-};
+use iggy_common::{EncryptorKind, Identifier, IggyError, TransportProtocol};
 use std::hash::Hasher as _;
 use std::{
     cell::{Cell, RefCell},
@@ -120,7 +117,7 @@ pub struct IggyShard {
     pub(crate) streams2: Streams,
     pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardInfo>>,
     // TODO: Refactor.
-    pub(crate) state: StateKind,
+    pub(crate) state: FileState,
 
     // Temporal...
     pub(crate) encryptor: Option<EncryptorKind>,
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 94e1aaaca..7c543c3b7 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -48,7 +48,7 @@ impl IggyShard {
         batch: IggyMessagesBatchMut,
     ) -> Result<(), IggyError> {
         self.ensure_topic_exists(&stream_id, &topic_id)?;
-        
+
         let numeric_stream_id = self
             .streams2
             .with_stream_by_id(&stream_id, streams::helpers::get_stream_id());
@@ -164,7 +164,7 @@ impl IggyShard {
         args: PollingArgs,
     ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
         self.ensure_topic_exists(&stream_id, &topic_id)?;
-        
+
         let numeric_stream_id = self
             .streams2
             .with_stream_by_id(&stream_id, streams::helpers::get_stream_id());
diff --git a/core/server/src/shard/system/snapshot/mod.rs 
b/core/server/src/shard/system/snapshot/mod.rs
index b97b5dcd2..35d94ef00 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -31,7 +31,7 @@ use std::time::Instant;
 use tempfile::NamedTempFile;
 use tracing::{error, info};
 
-// TODO(hubcio): compio has a `process` module, but it currently blocks the 
executor when the runtime
+// NOTE(hubcio): compio has a `process` module, but it currently blocks the 
executor when the runtime
 // has thread_pool_limit(0) configured (which we do on non-macOS platforms in 
bootstrap.rs).
 // To use compio::process::Command, we need to either:
 // 1. Enable thread pool by removing/increasing thread_pool_limit(0)
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index 1623d402a..4ce440599 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -231,7 +231,8 @@ mod tests {
     use crate::shard::transmission::connector::ShardConnector;
     use crate::slab::streams::Streams;
     use crate::slab::users::Users;
-    use crate::state::{MockState, StateKind};
+    use crate::state::file::FileState;
+    use crate::streaming::persistence::persister::{FilePersister, 
PersisterKind};
     use crate::streaming::session::Session;
     use crate::streaming::streams;
     use crate::streaming::users::user::User;
@@ -241,9 +242,11 @@ mod tests {
     use iggy_common::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
     use std::net::{Ipv4Addr, SocketAddr};
     use std::rc::Rc;
+    use std::sync::Arc;
 
     fn create_test_shard() -> Rc<IggyShard> {
-        let _tempdir = tempfile::TempDir::new().unwrap();
+        let tempdir = tempfile::TempDir::new().unwrap();
+        let state_path = tempdir.path().join("state.log");
         let config = ServerConfig::default();
 
         let streams = Streams::default();
@@ -252,7 +255,12 @@ mod tests {
         let shards_table: 
EternalPtr<DashMap<crate::shard::namespace::IggyNamespace, ShardInfo>> =
             shards_table.into();
         let users = Users::new();
-        let state = StateKind::Mock(MockState::new());
+        let state = FileState::new(
+            &state_path.to_string_lossy(),
+            &SemanticVersion::current().unwrap(),
+            Arc::new(PersisterKind::File(FilePersister)),
+            None,
+        );
         let connections = vec![ShardConnector::new(0, 1)];
 
         let builder = IggyShard::builder();
diff --git a/core/server/src/shard/system/utils.rs 
b/core/server/src/shard/system/utils.rs
index 4de0e7747..89135edd3 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -3,7 +3,9 @@ use iggy_common::{Consumer, ConsumerKind, Identifier, 
IggyError};
 use crate::{
     shard::IggyShard,
     streaming::{
-        polling_consumer::PollingConsumer, streams::helpers::get_stream_id, 
topics::{self}
+        polling_consumer::PollingConsumer,
+        streams::helpers::get_stream_id,
+        topics::{self},
     },
 };
 
@@ -74,16 +76,14 @@ impl IggyShard {
         partition_id: usize,
     ) -> Result<(), IggyError> {
         self.ensure_topic_exists(stream_id, topic_id)?;
-        let partition_exists = self.streams2.with_topic_by_id(
-            stream_id,
-            topic_id,
-            |(root, ..)| root.partitions().exists(partition_id),
-        );
+        let partition_exists = self
+            .streams2
+            .with_topic_by_id(stream_id, topic_id, |(root, ..)| {
+                root.partitions().exists(partition_id)
+            });
 
         if !partition_exists {
-            let numeric_stream_id = self
-                .streams2
-                .with_stream_by_id(stream_id, get_stream_id());
+            let numeric_stream_id = self.streams2.with_stream_by_id(stream_id, 
get_stream_id());
             let numeric_topic_id = self.streams2.with_topic_by_id(
                 stream_id,
                 topic_id,
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 1b9f3835e..0ad76809c 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1369,6 +1369,7 @@ impl Streams {
         Ok(())
     }
 
+    #[allow(clippy::too_many_arguments)]
     pub async fn auto_commit_consumer_offset(
         &self,
         shard_id: u16,
@@ -1379,8 +1380,10 @@ impl Streams {
         consumer: PollingConsumer,
         offset: u64,
     ) -> Result<(), IggyError> {
-        let numeric_stream_id = self.with_stream_by_id(stream_id, 
streams::helpers::get_stream_id());
-        let numeric_topic_id = self.with_topic_by_id(stream_id, topic_id, 
topics::helpers::get_topic_id());
+        let numeric_stream_id =
+            self.with_stream_by_id(stream_id, 
streams::helpers::get_stream_id());
+        let numeric_topic_id =
+            self.with_topic_by_id(stream_id, topic_id, 
topics::helpers::get_topic_id());
 
         trace!(
             "Last offset: {} will be automatically stored for {}, stream: {}, 
topic: {}, partition: {}",
@@ -1408,7 +1411,12 @@ impl Streams {
                         (offset_value, path)
                     },
                 );
-                
crate::streaming::partitions::storage2::persist_offset(shard_id, &path, 
offset_value).await?;
+                crate::streaming::partitions::storage2::persist_offset(
+                    shard_id,
+                    &path,
+                    offset_value,
+                )
+                .await?;
             }
             PollingConsumer::ConsumerGroup(cg_id, _) => {
                 let (offset_value, path) = self.with_partition_by_id(
@@ -1430,7 +1438,12 @@ impl Streams {
                         (offset_value, path)
                     },
                 );
-                
crate::streaming::partitions::storage2::persist_offset(shard_id, &path, 
offset_value).await?;
+                crate::streaming::partitions::storage2::persist_offset(
+                    shard_id,
+                    &path,
+                    offset_value,
+                )
+                .await?;
             }
         }
 
diff --git a/core/server/src/slab/traits_ext.rs 
b/core/server/src/slab/traits_ext.rs
index 6a23d30e9..de4e11df9 100644
--- a/core/server/src/slab/traits_ext.rs
+++ b/core/server/src/slab/traits_ext.rs
@@ -49,10 +49,6 @@ pub struct Borrow;
 /// Marker type for component containers that use interior mutability.
 pub struct InteriorMutability;
 
-mod private {
-    pub trait Sealed {}
-}
-
 // I think it's better to *NOT* use `Components` directly on the `with` 
methods.
 // Instead use the `Self::EntityRef` type directly.
 // This way we can auto implement the `with_by_id` method.
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index 54a594692..cc6de3be1 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::state::command::EntryCommand;
-use crate::state::{COMPONENT, State, StateEntry};
+use crate::state::{COMPONENT, StateEntry};
 use crate::streaming::persistence::persister::PersisterKind;
 use crate::streaming::utils::file;
 use crate::versioning::SemanticVersion;
@@ -80,10 +80,8 @@ impl FileState {
     pub fn term(&self) -> u64 {
         self.term.load(Ordering::SeqCst)
     }
-}
 
-impl State for FileState {
-    async fn init(&self) -> Result<Vec<StateEntry>, IggyError> {
+    pub async fn init(&self) -> Result<Vec<StateEntry>, IggyError> {
         assert!(Path::new(&self.path).exists());
 
         let entries = self.load_entries().await.with_error_context(|error| {
@@ -101,7 +99,7 @@ impl State for FileState {
         Ok(entries)
     }
 
-    async fn load_entries(&self) -> Result<Vec<StateEntry>, IggyError> {
+    pub async fn load_entries(&self) -> Result<Vec<StateEntry>, IggyError> {
         if !Path::new(&self.path).exists() {
             return Err(IggyError::StateFileNotFound);
         }
@@ -297,7 +295,7 @@ impl State for FileState {
         Ok(entries)
     }
 
-    async fn apply(&self, user_id: u32, command: &EntryCommand) -> Result<(), 
IggyError> {
+    pub async fn apply(&self, user_id: u32, command: &EntryCommand) -> 
Result<(), IggyError> {
         debug!("Applying state entry with command: {command}, user ID: 
{user_id}");
         let timestamp = IggyTimestamp::now();
         let index = if self.entries_count.load(Ordering::SeqCst) == 0 {
diff --git a/core/server/src/state/mod.rs b/core/server/src/state/mod.rs
index 2bf2c6bfb..a1aba87d1 100644
--- a/core/server/src/state/mod.rs
+++ b/core/server/src/state/mod.rs
@@ -16,14 +16,6 @@
  * under the License.
  */
 
-use crate::state::command::EntryCommand;
-use crate::state::entry::StateEntry;
-use iggy_common::IggyError;
-#[cfg(test)]
-use mockall::automock;
-use std::fmt::Debug;
-use std::future::Future;
-
 pub mod command;
 pub mod entry;
 pub mod file;
@@ -32,49 +24,5 @@ pub mod system;
 
 pub const COMPONENT: &str = "STATE";
 
-// TODO(hubcio): I don't like this approach, we should avoid mocking and 
simplify it all.
-
-#[allow(clippy::large_enum_variant)]
-#[derive(Debug)]
-pub enum StateKind {
-    File(file::FileState),
-    #[cfg(test)]
-    Mock(MockState),
-}
-
-#[cfg_attr(test, automock)]
-pub trait State {
-    fn init(&self) -> impl Future<Output = Result<Vec<StateEntry>, IggyError>>;
-    fn load_entries(&self) -> impl Future<Output = Result<Vec<StateEntry>, 
IggyError>>;
-    fn apply(
-        &self,
-        user_id: u32,
-        command: &EntryCommand,
-    ) -> impl Future<Output = Result<(), IggyError>>;
-}
-
-impl StateKind {
-    pub async fn init(&self) -> Result<Vec<StateEntry>, IggyError> {
-        match self {
-            Self::File(s) => s.init().await,
-            #[cfg(test)]
-            Self::Mock(s) => s.init().await,
-        }
-    }
-
-    pub async fn load_entries(&self) -> Result<Vec<StateEntry>, IggyError> {
-        match self {
-            Self::File(s) => s.load_entries().await,
-            #[cfg(test)]
-            Self::Mock(s) => s.load_entries().await,
-        }
-    }
-
-    pub async fn apply(&self, user_id: u32, command: &EntryCommand) -> 
Result<(), IggyError> {
-        match self {
-            Self::File(s) => s.apply(user_id, command).await,
-            #[cfg(test)]
-            Self::Mock(s) => s.apply(user_id, command).await,
-        }
-    }
-}
+pub use command::EntryCommand;
+pub use entry::StateEntry;
diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs
index 31275f56c..d552e4168 100644
--- a/core/server/src/state/system.rs
+++ b/core/server/src/state/system.rs
@@ -17,8 +17,9 @@
  */
 
 use crate::bootstrap::create_root_user;
+use crate::state::file::FileState;
 use crate::state::models::CreateUserWithId;
-use crate::state::{COMPONENT, EntryCommand, StateEntry, StateKind};
+use crate::state::{COMPONENT, EntryCommand, StateEntry};
 use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use ahash::AHashMap;
 use error_set::ErrContext;
@@ -98,7 +99,7 @@ pub struct ConsumerGroupState {
 }
 
 impl SystemState {
-    pub async fn load(state: StateKind) -> Result<Self, IggyError> {
+    pub async fn load(state: FileState) -> Result<Self, IggyError> {
         let mut state_entries = state.init().await.with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to initialize state 
entries")
         })?;
diff --git a/core/server/src/streaming/persistence/persister.rs 
b/core/server/src/streaming/persistence/persister.rs
index ca522b856..8f5be92b0 100644
--- a/core/server/src/streaming/persistence/persister.rs
+++ b/core/server/src/streaming/persistence/persister.rs
@@ -24,17 +24,11 @@ use compio::io::AsyncWriteAtExt;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use std::fmt::Debug;
-use std::future::Future;
-
-#[cfg(test)]
-use mockall::automock;
 
 #[derive(Debug)]
 pub enum PersisterKind {
     File(FilePersister),
     FileWithSync(FileWithSyncPersister),
-    #[cfg(test)]
-    Mock(MockPersister),
 }
 
 impl PersisterKind {
@@ -42,8 +36,6 @@ impl PersisterKind {
         match self {
             PersisterKind::File(p) => p.append(path, bytes).await,
             PersisterKind::FileWithSync(p) => p.append(path, bytes).await,
-            #[cfg(test)]
-            PersisterKind::Mock(p) => p.append(path, bytes).await,
         }
     }
 
@@ -51,8 +43,6 @@ impl PersisterKind {
         match self {
             PersisterKind::File(p) => p.overwrite(path, bytes).await,
             PersisterKind::FileWithSync(p) => p.overwrite(path, bytes).await,
-            #[cfg(test)]
-            PersisterKind::Mock(p) => p.overwrite(path, bytes).await,
         }
     }
 
@@ -60,32 +50,15 @@ impl PersisterKind {
         match self {
             PersisterKind::File(p) => p.delete(path).await,
             PersisterKind::FileWithSync(p) => p.delete(path).await,
-            #[cfg(test)]
-            PersisterKind::Mock(p) => p.delete(path).await,
         }
     }
 }
 
-#[cfg_attr(test, automock)]
-pub trait Persister {
-    fn append<B: IoBuf>(&self, path: &str, bytes: B)
-    -> impl Future<Output = Result<(), IggyError>>;
-    fn overwrite<B: IoBuf>(
-        &self,
-        path: &str,
-        bytes: B,
-    ) -> impl Future<Output = Result<(), IggyError>>;
-    fn delete(&self, path: &str) -> impl Future<Output = Result<(), 
IggyError>>;
-}
-
 #[derive(Debug)]
 pub struct FilePersister;
 
-#[derive(Debug)]
-pub struct FileWithSyncPersister;
-
-impl Persister for FilePersister {
-    async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), 
IggyError> {
+impl FilePersister {
+    pub async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), 
IggyError> {
         let (mut file, position) = file::append(path)
             .await
             .with_error_context(|error| {
@@ -102,7 +75,7 @@ impl Persister for FilePersister {
         Ok(())
     }
 
-    async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), 
IggyError> {
+    pub async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> 
Result<(), IggyError> {
         let mut file = file::overwrite(path)
             .await
             .with_error_context(|error| {
@@ -120,7 +93,7 @@ impl Persister for FilePersister {
         Ok(())
     }
 
-    async fn delete(&self, path: &str) -> Result<(), IggyError> {
+    pub async fn delete(&self, path: &str) -> Result<(), IggyError> {
         remove_file(path)
             .await
             .with_error_context(|error| {
@@ -131,8 +104,11 @@ impl Persister for FilePersister {
     }
 }
 
-impl Persister for FileWithSyncPersister {
-    async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), 
IggyError> {
+#[derive(Debug)]
+pub struct FileWithSyncPersister;
+
+impl FileWithSyncPersister {
+    pub async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), 
IggyError> {
         let (mut file, position) = file::append(path)
             .await
             .with_error_context(|error| {
@@ -157,7 +133,7 @@ impl Persister for FileWithSyncPersister {
         Ok(())
     }
 
-    async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), 
IggyError> {
+    pub async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> 
Result<(), IggyError> {
         let mut file = file::overwrite(path)
             .await
             .with_error_context(|error| {
@@ -183,7 +159,7 @@ impl Persister for FileWithSyncPersister {
         Ok(())
     }
 
-    async fn delete(&self, path: &str) -> Result<(), IggyError> {
+    pub async fn delete(&self, path: &str) -> Result<(), IggyError> {
         remove_file(path)
             .await
             .with_error_context(|error| {

Reply via email to