This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 3e95ef743 feat(io_uring): remove MockPersister and MockState (#2262)
3e95ef743 is described below
commit 3e95ef743166d4766ce3ebdb22b72468ff74b5dc
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Oct 14 11:16:38 2025 +0200
feat(io_uring): remove MockPersister and MockState (#2262)
---
Cargo.lock | 1 -
core/integration/tests/server/mod.rs | 10 ++--
...h_multiple_clients_polling_messages_scenario.rs | 2 +-
core/integration/tests/state/file.rs | 1 -
core/integration/tests/state/system.rs | 1 -
core/server/Cargo.toml | 1 -
core/server/src/log/runtime.rs | 8 +---
core/server/src/main.rs | 14 +++---
core/server/src/shard/builder.rs | 6 +--
core/server/src/shard/mod.rs | 13 ++---
core/server/src/shard/system/messages.rs | 4 +-
core/server/src/shard/system/snapshot/mod.rs | 2 +-
core/server/src/shard/system/streams.rs | 14 ++++--
core/server/src/shard/system/utils.rs | 18 +++----
core/server/src/slab/streams.rs | 21 ++++++--
core/server/src/slab/traits_ext.rs | 4 --
core/server/src/state/file.rs | 10 ++--
core/server/src/state/mod.rs | 56 +---------------------
core/server/src/state/system.rs | 5 +-
core/server/src/streaming/persistence/persister.rs | 46 +++++-------------
20 files changed, 82 insertions(+), 155 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 7c5d12e8a..f92a8002a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7509,7 +7509,6 @@ dependencies = [
"jsonwebtoken",
"lending-iterator",
"mimalloc",
- "mockall",
"moka",
"nix",
"once_cell",
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index 15d559d8a..bc0620725 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -22,7 +22,6 @@ mod general;
mod scenarios;
mod specific;
-use compio::rustls::pki_types::IpAddr;
use iggy_common::TransportProtocol;
use integration::{
http_client::HttpClientFactory,
@@ -37,8 +36,8 @@ use scenarios::{
consumer_group_with_single_client_polling_messages_scenario,
create_message_payload,
message_headers_scenario, stream_size_validation_scenario,
system_scenario, user_scenario,
};
-use std::{collections::HashMap, future::Future};
use std::pin::Pin;
+use std::{collections::HashMap, future::Future};
type ScenarioFn = fn(&dyn ClientFactory) -> Pin<Box<dyn Future<Output = ()> +
'_>>;
@@ -82,10 +81,11 @@ async fn run_scenario(transport: TransportProtocol,
scenario: ScenarioFn) {
// TODO: Need to enable `TCP_NODELAY` flag for TCP transports, due to
small messages being used in the test.
// For some reason TCP in compio can't deal with it, but in tokio it works
fine.
let mut extra_envs = HashMap::new();
- extra_envs.insert("IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
"true".to_string());
extra_envs.insert(
- "IGGY_TCP_SOCKET_NODELAY".to_string(),
- "true".to_string());
+ "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+ "true".to_string(),
+ );
+ extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(),
"true".to_string());
let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
test_server.start();
diff --git
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
index ec52814c7..44746bf32 100644
---
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
+++
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
@@ -24,7 +24,7 @@ use iggy::prelude::*;
use integration::test_server::{
ClientFactory, assert_clean_system, create_user, login_root, login_user,
};
-use std::str::{FromStr, from_utf8};
+use std::str::FromStr;
pub async fn run(client_factory: &dyn ClientFactory) {
let system_client = create_client(client_factory).await;
diff --git a/core/integration/tests/state/file.rs
b/core/integration/tests/state/file.rs
index 2b718d81f..9b1a9d181 100644
--- a/core/integration/tests/state/file.rs
+++ b/core/integration/tests/state/file.rs
@@ -21,7 +21,6 @@ use bytes::Bytes;
use iggy::prelude::BytesSerializable;
use iggy_common::create_stream::CreateStream;
use iggy_common::create_user::CreateUser;
-use server::state::State;
use server::state::command::EntryCommand;
use server::state::entry::StateEntry;
use server::state::models::{CreateStreamWithId, CreateUserWithId};
diff --git a/core/integration/tests/state/system.rs
b/core/integration/tests/state/system.rs
index 77eeb5851..60ee5c700 100644
--- a/core/integration/tests/state/system.rs
+++ b/core/integration/tests/state/system.rs
@@ -25,7 +25,6 @@ use iggy_common::create_stream::CreateStream;
use iggy_common::create_topic::CreateTopic;
use iggy_common::create_user::CreateUser;
use iggy_common::delete_stream::DeleteStream;
-use server::state::State;
use server::state::command::EntryCommand;
use server::state::models::{
CreateConsumerGroupWithId, CreatePersonalAccessTokenWithHash,
CreateStreamWithId,
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 6301615e6..79f8ed8be 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -141,5 +141,4 @@ vergen-git2 = { version = "1.0.7", features = [
] }
[dev-dependencies]
-mockall = { workspace = true }
serial_test = { workspace = true }
diff --git a/core/server/src/log/runtime.rs b/core/server/src/log/runtime.rs
index cd62736f4..2ee5318b3 100644
--- a/core/server/src/log/runtime.rs
+++ b/core/server/src/log/runtime.rs
@@ -10,13 +10,9 @@ impl Runtime for CompioRuntime {
where
F: Future<Output = ()> + Send + 'static,
{
- // It's fine to detach this task, the documentation for `spawn` method
on `Runtime` trait says:
- //
- //
- // "This is mainly used to run batch span processing in the
background. Note, that the function
+ // This is mainly used to run batch span processing in the background.
Note, that the function
// does not return a handle. OpenTelemetry will use a different way to
wait for the future to
- // finish when the caller shuts down.""
- // TODO(hubcio): investigate if we can use TaskRegistry API for task
spawn
+ // finish when the caller shuts down.
compio::runtime::spawn(future).detach();
}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index a4ecdcc75..910d9a518 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -39,7 +39,6 @@ use server::shard::{IggyShard, ShardInfo,
calculate_shard_assignment};
use server::slab::traits_ext::{
EntityComponentSystem, EntityComponentSystemMutCell, IntoComponents,
};
-use server::state::StateKind;
use server::state::file::FileState;
use server::state::system::SystemState;
use server::streaming::diagnostics::metrics::Metrics;
@@ -81,12 +80,11 @@ async fn main() -> Result<(), ServerError> {
);
}
let args = Args::parse();
+
// FIRST DISCRETE LOADING STEP.
- // TODO: I think we could get rid of config provider, since we support
only TOML
- // as config provider.
- let config_provider = config_provider::resolve(&args.config_provider)?;
// Load config and create directories.
// Remove `local_data` directory if run with `--fresh` flag.
+ let config_provider = config_provider::resolve(&args.config_provider)?;
let config = load_config(&config_provider)
.await
.with_error_context(|error| {
@@ -197,12 +195,12 @@ async fn main() -> Result<(), ServerError> {
// TENTH DISCRETE LOADING STEP.
let state_persister = resolve_persister(config.system.state.enforce_fsync);
- let state = StateKind::File(FileState::new(
+ let state = FileState::new(
&config.system.get_state_messages_file_path(),
¤t_version,
state_persister,
encryptor.clone(),
- ));
+ );
let state = SystemState::load(state).await?;
let (streams_state, users_state) = state.decompose();
let streams = load_streams(streams_state.into_values(),
&config.system).await?;
@@ -284,12 +282,12 @@ async fn main() -> Result<(), ServerError> {
let encryptor = encryptor.clone();
let metrics = metrics.clone();
let state_persister =
resolve_persister(config.system.state.enforce_fsync);
- let state = StateKind::File(FileState::new(
+ let state = FileState::new(
&config.system.get_state_messages_file_path(),
¤t_version,
state_persister,
encryptor.clone(),
- ));
+ );
// TODO: Explore decoupling the `Log` from `Partition` entity.
// Ergh... I knew this will backfire to include `Log` as part of the
`Partition` entity,
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 86abdeab1..dce9b268b 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -24,7 +24,7 @@ use crate::{
configs::server::ServerConfig,
shard::{Shard, ShardInfo, namespace::IggyNamespace},
slab::{streams::Streams, users::Users},
- state::StateKind,
+ state::file::FileState,
streaming::{diagnostics::metrics::Metrics, utils::ptr::EternalPtr},
versioning::SemanticVersion,
};
@@ -37,7 +37,7 @@ pub struct IggyShardBuilder {
id: Option<u16>,
streams: Option<Streams>,
shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardInfo>>>,
- state: Option<StateKind>,
+ state: Option<FileState>,
users: Option<Users>,
connections: Option<Vec<ShardConnector<ShardFrame>>>,
config: Option<ServerConfig>,
@@ -85,7 +85,7 @@ impl IggyShardBuilder {
self
}
- pub fn state(mut self, state: StateKind) -> Self {
+ pub fn state(mut self, state: FileState) -> Self {
self.state = Some(state);
self
}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 94f61df33..0996e1fc1 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -38,11 +38,10 @@ use crate::{
},
shard_error, shard_info,
slab::{streams::Streams, traits_ext::EntityMarker, users::Users},
- state::StateKind,
+ state::file::FileState,
streaming::{
- clients::client_manager::ClientManager, diagnostics::metrics::Metrics,
- session::Session, traits::MainOps,
- users::permissioner::Permissioner, utils::ptr::EternalPtr,
+ clients::client_manager::ClientManager, diagnostics::metrics::Metrics,
session::Session,
+ traits::MainOps, users::permissioner::Permissioner,
utils::ptr::EternalPtr,
},
versioning::SemanticVersion,
};
@@ -51,9 +50,7 @@ use compio::io::AsyncWriteAtExt;
use dashmap::DashMap;
use error_set::ErrContext;
use hash32::{Hasher, Murmur3Hasher};
-use iggy_common::{
- EncryptorKind, Identifier, IggyError, TransportProtocol,
-};
+use iggy_common::{EncryptorKind, Identifier, IggyError, TransportProtocol};
use std::hash::Hasher as _;
use std::{
cell::{Cell, RefCell},
@@ -120,7 +117,7 @@ pub struct IggyShard {
pub(crate) streams2: Streams,
pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardInfo>>,
// TODO: Refactor.
- pub(crate) state: StateKind,
+ pub(crate) state: FileState,
// Temporal...
pub(crate) encryptor: Option<EncryptorKind>,
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 94e1aaaca..7c543c3b7 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -48,7 +48,7 @@ impl IggyShard {
batch: IggyMessagesBatchMut,
) -> Result<(), IggyError> {
self.ensure_topic_exists(&stream_id, &topic_id)?;
-
+
let numeric_stream_id = self
.streams2
.with_stream_by_id(&stream_id, streams::helpers::get_stream_id());
@@ -164,7 +164,7 @@ impl IggyShard {
args: PollingArgs,
) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
self.ensure_topic_exists(&stream_id, &topic_id)?;
-
+
let numeric_stream_id = self
.streams2
.with_stream_by_id(&stream_id, streams::helpers::get_stream_id());
diff --git a/core/server/src/shard/system/snapshot/mod.rs
b/core/server/src/shard/system/snapshot/mod.rs
index b97b5dcd2..35d94ef00 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -31,7 +31,7 @@ use std::time::Instant;
use tempfile::NamedTempFile;
use tracing::{error, info};
-// TODO(hubcio): compio has a `process` module, but it currently blocks the
executor when the runtime
+// NOTE(hubcio): compio has a `process` module, but it currently blocks the
executor when the runtime
// has thread_pool_limit(0) configured (which we do on non-macOS platforms in
bootstrap.rs).
// To use compio::process::Command, we need to either:
// 1. Enable thread pool by removing/increasing thread_pool_limit(0)
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index 1623d402a..4ce440599 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -231,7 +231,8 @@ mod tests {
use crate::shard::transmission::connector::ShardConnector;
use crate::slab::streams::Streams;
use crate::slab::users::Users;
- use crate::state::{MockState, StateKind};
+ use crate::state::file::FileState;
+ use crate::streaming::persistence::persister::{FilePersister,
PersisterKind};
use crate::streaming::session::Session;
use crate::streaming::streams;
use crate::streaming::users::user::User;
@@ -241,9 +242,11 @@ mod tests {
use iggy_common::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
use std::net::{Ipv4Addr, SocketAddr};
use std::rc::Rc;
+ use std::sync::Arc;
fn create_test_shard() -> Rc<IggyShard> {
- let _tempdir = tempfile::TempDir::new().unwrap();
+ let tempdir = tempfile::TempDir::new().unwrap();
+ let state_path = tempdir.path().join("state.log");
let config = ServerConfig::default();
let streams = Streams::default();
@@ -252,7 +255,12 @@ mod tests {
let shards_table:
EternalPtr<DashMap<crate::shard::namespace::IggyNamespace, ShardInfo>> =
shards_table.into();
let users = Users::new();
- let state = StateKind::Mock(MockState::new());
+ let state = FileState::new(
+ &state_path.to_string_lossy(),
+ &SemanticVersion::current().unwrap(),
+ Arc::new(PersisterKind::File(FilePersister)),
+ None,
+ );
let connections = vec![ShardConnector::new(0, 1)];
let builder = IggyShard::builder();
diff --git a/core/server/src/shard/system/utils.rs
b/core/server/src/shard/system/utils.rs
index 4de0e7747..89135edd3 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -3,7 +3,9 @@ use iggy_common::{Consumer, ConsumerKind, Identifier,
IggyError};
use crate::{
shard::IggyShard,
streaming::{
- polling_consumer::PollingConsumer, streams::helpers::get_stream_id,
topics::{self}
+ polling_consumer::PollingConsumer,
+ streams::helpers::get_stream_id,
+ topics::{self},
},
};
@@ -74,16 +76,14 @@ impl IggyShard {
partition_id: usize,
) -> Result<(), IggyError> {
self.ensure_topic_exists(stream_id, topic_id)?;
- let partition_exists = self.streams2.with_topic_by_id(
- stream_id,
- topic_id,
- |(root, ..)| root.partitions().exists(partition_id),
- );
+ let partition_exists = self
+ .streams2
+ .with_topic_by_id(stream_id, topic_id, |(root, ..)| {
+ root.partitions().exists(partition_id)
+ });
if !partition_exists {
- let numeric_stream_id = self
- .streams2
- .with_stream_by_id(stream_id, get_stream_id());
+ let numeric_stream_id = self.streams2.with_stream_by_id(stream_id,
get_stream_id());
let numeric_topic_id = self.streams2.with_topic_by_id(
stream_id,
topic_id,
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 1b9f3835e..0ad76809c 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1369,6 +1369,7 @@ impl Streams {
Ok(())
}
+ #[allow(clippy::too_many_arguments)]
pub async fn auto_commit_consumer_offset(
&self,
shard_id: u16,
@@ -1379,8 +1380,10 @@ impl Streams {
consumer: PollingConsumer,
offset: u64,
) -> Result<(), IggyError> {
- let numeric_stream_id = self.with_stream_by_id(stream_id,
streams::helpers::get_stream_id());
- let numeric_topic_id = self.with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
+ let numeric_stream_id =
+ self.with_stream_by_id(stream_id,
streams::helpers::get_stream_id());
+ let numeric_topic_id =
+ self.with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
trace!(
"Last offset: {} will be automatically stored for {}, stream: {},
topic: {}, partition: {}",
@@ -1408,7 +1411,12 @@ impl Streams {
(offset_value, path)
},
);
-
crate::streaming::partitions::storage2::persist_offset(shard_id, &path,
offset_value).await?;
+ crate::streaming::partitions::storage2::persist_offset(
+ shard_id,
+ &path,
+ offset_value,
+ )
+ .await?;
}
PollingConsumer::ConsumerGroup(cg_id, _) => {
let (offset_value, path) = self.with_partition_by_id(
@@ -1430,7 +1438,12 @@ impl Streams {
(offset_value, path)
},
);
-
crate::streaming::partitions::storage2::persist_offset(shard_id, &path,
offset_value).await?;
+ crate::streaming::partitions::storage2::persist_offset(
+ shard_id,
+ &path,
+ offset_value,
+ )
+ .await?;
}
}
diff --git a/core/server/src/slab/traits_ext.rs
b/core/server/src/slab/traits_ext.rs
index 6a23d30e9..de4e11df9 100644
--- a/core/server/src/slab/traits_ext.rs
+++ b/core/server/src/slab/traits_ext.rs
@@ -49,10 +49,6 @@ pub struct Borrow;
/// Marker type for component containers that use interior mutability.
pub struct InteriorMutability;
-mod private {
- pub trait Sealed {}
-}
-
// I think it's better to *NOT* use `Components` directly on the `with`
methods.
// Instead use the `Self::EntityRef` type directly.
// This way we can auto implement the `with_by_id` method.
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index 54a594692..cc6de3be1 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -17,7 +17,7 @@
*/
use crate::state::command::EntryCommand;
-use crate::state::{COMPONENT, State, StateEntry};
+use crate::state::{COMPONENT, StateEntry};
use crate::streaming::persistence::persister::PersisterKind;
use crate::streaming::utils::file;
use crate::versioning::SemanticVersion;
@@ -80,10 +80,8 @@ impl FileState {
pub fn term(&self) -> u64 {
self.term.load(Ordering::SeqCst)
}
-}
-impl State for FileState {
- async fn init(&self) -> Result<Vec<StateEntry>, IggyError> {
+ pub async fn init(&self) -> Result<Vec<StateEntry>, IggyError> {
assert!(Path::new(&self.path).exists());
let entries = self.load_entries().await.with_error_context(|error| {
@@ -101,7 +99,7 @@ impl State for FileState {
Ok(entries)
}
- async fn load_entries(&self) -> Result<Vec<StateEntry>, IggyError> {
+ pub async fn load_entries(&self) -> Result<Vec<StateEntry>, IggyError> {
if !Path::new(&self.path).exists() {
return Err(IggyError::StateFileNotFound);
}
@@ -297,7 +295,7 @@ impl State for FileState {
Ok(entries)
}
- async fn apply(&self, user_id: u32, command: &EntryCommand) -> Result<(),
IggyError> {
+ pub async fn apply(&self, user_id: u32, command: &EntryCommand) ->
Result<(), IggyError> {
debug!("Applying state entry with command: {command}, user ID:
{user_id}");
let timestamp = IggyTimestamp::now();
let index = if self.entries_count.load(Ordering::SeqCst) == 0 {
diff --git a/core/server/src/state/mod.rs b/core/server/src/state/mod.rs
index 2bf2c6bfb..a1aba87d1 100644
--- a/core/server/src/state/mod.rs
+++ b/core/server/src/state/mod.rs
@@ -16,14 +16,6 @@
* under the License.
*/
-use crate::state::command::EntryCommand;
-use crate::state::entry::StateEntry;
-use iggy_common::IggyError;
-#[cfg(test)]
-use mockall::automock;
-use std::fmt::Debug;
-use std::future::Future;
-
pub mod command;
pub mod entry;
pub mod file;
@@ -32,49 +24,5 @@ pub mod system;
pub const COMPONENT: &str = "STATE";
-// TODO(hubcio): I don't like this approach, we should avoid mocking and
simplify it all.
-
-#[allow(clippy::large_enum_variant)]
-#[derive(Debug)]
-pub enum StateKind {
- File(file::FileState),
- #[cfg(test)]
- Mock(MockState),
-}
-
-#[cfg_attr(test, automock)]
-pub trait State {
- fn init(&self) -> impl Future<Output = Result<Vec<StateEntry>, IggyError>>;
- fn load_entries(&self) -> impl Future<Output = Result<Vec<StateEntry>,
IggyError>>;
- fn apply(
- &self,
- user_id: u32,
- command: &EntryCommand,
- ) -> impl Future<Output = Result<(), IggyError>>;
-}
-
-impl StateKind {
- pub async fn init(&self) -> Result<Vec<StateEntry>, IggyError> {
- match self {
- Self::File(s) => s.init().await,
- #[cfg(test)]
- Self::Mock(s) => s.init().await,
- }
- }
-
- pub async fn load_entries(&self) -> Result<Vec<StateEntry>, IggyError> {
- match self {
- Self::File(s) => s.load_entries().await,
- #[cfg(test)]
- Self::Mock(s) => s.load_entries().await,
- }
- }
-
- pub async fn apply(&self, user_id: u32, command: &EntryCommand) ->
Result<(), IggyError> {
- match self {
- Self::File(s) => s.apply(user_id, command).await,
- #[cfg(test)]
- Self::Mock(s) => s.apply(user_id, command).await,
- }
- }
-}
+pub use command::EntryCommand;
+pub use entry::StateEntry;
diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs
index 31275f56c..d552e4168 100644
--- a/core/server/src/state/system.rs
+++ b/core/server/src/state/system.rs
@@ -17,8 +17,9 @@
*/
use crate::bootstrap::create_root_user;
+use crate::state::file::FileState;
use crate::state::models::CreateUserWithId;
-use crate::state::{COMPONENT, EntryCommand, StateEntry, StateKind};
+use crate::state::{COMPONENT, EntryCommand, StateEntry};
use
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use ahash::AHashMap;
use error_set::ErrContext;
@@ -98,7 +99,7 @@ pub struct ConsumerGroupState {
}
impl SystemState {
- pub async fn load(state: StateKind) -> Result<Self, IggyError> {
+ pub async fn load(state: FileState) -> Result<Self, IggyError> {
let mut state_entries = state.init().await.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to initialize state
entries")
})?;
diff --git a/core/server/src/streaming/persistence/persister.rs
b/core/server/src/streaming/persistence/persister.rs
index ca522b856..8f5be92b0 100644
--- a/core/server/src/streaming/persistence/persister.rs
+++ b/core/server/src/streaming/persistence/persister.rs
@@ -24,17 +24,11 @@ use compio::io::AsyncWriteAtExt;
use error_set::ErrContext;
use iggy_common::IggyError;
use std::fmt::Debug;
-use std::future::Future;
-
-#[cfg(test)]
-use mockall::automock;
#[derive(Debug)]
pub enum PersisterKind {
File(FilePersister),
FileWithSync(FileWithSyncPersister),
- #[cfg(test)]
- Mock(MockPersister),
}
impl PersisterKind {
@@ -42,8 +36,6 @@ impl PersisterKind {
match self {
PersisterKind::File(p) => p.append(path, bytes).await,
PersisterKind::FileWithSync(p) => p.append(path, bytes).await,
- #[cfg(test)]
- PersisterKind::Mock(p) => p.append(path, bytes).await,
}
}
@@ -51,8 +43,6 @@ impl PersisterKind {
match self {
PersisterKind::File(p) => p.overwrite(path, bytes).await,
PersisterKind::FileWithSync(p) => p.overwrite(path, bytes).await,
- #[cfg(test)]
- PersisterKind::Mock(p) => p.overwrite(path, bytes).await,
}
}
@@ -60,32 +50,15 @@ impl PersisterKind {
match self {
PersisterKind::File(p) => p.delete(path).await,
PersisterKind::FileWithSync(p) => p.delete(path).await,
- #[cfg(test)]
- PersisterKind::Mock(p) => p.delete(path).await,
}
}
}
-#[cfg_attr(test, automock)]
-pub trait Persister {
- fn append<B: IoBuf>(&self, path: &str, bytes: B)
- -> impl Future<Output = Result<(), IggyError>>;
- fn overwrite<B: IoBuf>(
- &self,
- path: &str,
- bytes: B,
- ) -> impl Future<Output = Result<(), IggyError>>;
- fn delete(&self, path: &str) -> impl Future<Output = Result<(),
IggyError>>;
-}
-
#[derive(Debug)]
pub struct FilePersister;
-#[derive(Debug)]
-pub struct FileWithSyncPersister;
-
-impl Persister for FilePersister {
- async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(),
IggyError> {
+impl FilePersister {
+ pub async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(),
IggyError> {
let (mut file, position) = file::append(path)
.await
.with_error_context(|error| {
@@ -102,7 +75,7 @@ impl Persister for FilePersister {
Ok(())
}
- async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(),
IggyError> {
+ pub async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) ->
Result<(), IggyError> {
let mut file = file::overwrite(path)
.await
.with_error_context(|error| {
@@ -120,7 +93,7 @@ impl Persister for FilePersister {
Ok(())
}
- async fn delete(&self, path: &str) -> Result<(), IggyError> {
+ pub async fn delete(&self, path: &str) -> Result<(), IggyError> {
remove_file(path)
.await
.with_error_context(|error| {
@@ -131,8 +104,11 @@ impl Persister for FilePersister {
}
}
-impl Persister for FileWithSyncPersister {
- async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(),
IggyError> {
+#[derive(Debug)]
+pub struct FileWithSyncPersister;
+
+impl FileWithSyncPersister {
+ pub async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(),
IggyError> {
let (mut file, position) = file::append(path)
.await
.with_error_context(|error| {
@@ -157,7 +133,7 @@ impl Persister for FileWithSyncPersister {
Ok(())
}
- async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(),
IggyError> {
+ pub async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) ->
Result<(), IggyError> {
let mut file = file::overwrite(path)
.await
.with_error_context(|error| {
@@ -183,7 +159,7 @@ impl Persister for FileWithSyncPersister {
Ok(())
}
- async fn delete(&self, path: &str) -> Result<(), IggyError> {
+ pub async fn delete(&self, path: &str) -> Result<(), IggyError> {
remove_file(path)
.await
.with_error_context(|error| {