This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_concurrency_issue_with_mesage_saver_task in repository https://gitbox.apache.org/repos/asf/iggy.git
commit fb5fb422171b01f04971ffd5d1b7f26cb8dbed5c Author: numminex <[email protected]> AuthorDate: Thu Nov 20 17:39:32 2025 +0100 fix(server): fix concurrency bug with message saver --- core/metadata/src/impls/metadata.rs | 18 ++++++++++++++++-- .../src/streaming/segments/messages/messages_reader.rs | 4 ++++ .../src/streaming/segments/messages/messages_writer.rs | 4 ++++ core/server/src/streaming/segments/storage.rs | 12 ++++++++++-- 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index f802d53f1..93a1a4e49 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -65,5 +65,19 @@ where } // TODO: Hide with associated types all of those generics, so they are not leaking to the upper layer -#[expect(unused)] -pub trait MetadataHandle {} +// Something like this: +// pub trait MetadataHandle { +// type Consensus: Consensus<Self::Clock>; +// type Clock: Clock; +// type MuxStm; +// type Journal; +// type Snapshot; +// } + +// pub trait Metadata<H: MetadataHandle> { +// fn on_request(&self, message: <H::Consensus as Consensus<H::Clock>>::RequestMessage); // Create type aliases for those long associated types +// fn on_replicate(&self, message: <H::Consensus as Consensus<H::Clock>>::ReplicateMessage); +// fn on_ack(&self, message: <H::Consensus as Consensus<H::Clock>>::AckMessage); +// } + +// The error messages can get ugly from those associated types, but I think it's worth the fact that it hides a lot of the generics and their bounds. diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs b/core/server/src/streaming/segments/messages/messages_reader.rs index 3a419947b..9f4335b9a 100644 --- a/core/server/src/streaming/segments/messages/messages_reader.rs +++ b/core/server/src/streaming/segments/messages/messages_reader.rs @@ -34,6 +34,7 @@ use tracing::{error, trace}; /// A dedicated struct for reading from the messages file. #[derive(Debug)] pub struct MessagesReader { + flock: Rc<tokio::sync::RwLock<()>>, file_path: String, file: File, messages_size_bytes: Rc<AtomicU64>, @@ -47,6 +48,7 @@ impl MessagesReader { pub async fn new( file_path: &str, messages_size_bytes: Rc<AtomicU64>, + flock: Rc<tokio::sync::RwLock<()>>, ) -> Result<Self, IggyError> { let file = OpenOptions::new() .read(true) @@ -81,6 +83,7 @@ impl MessagesReader { file_path: file_path.to_string(), file, messages_size_bytes, + flock, }) } @@ -135,6 +138,7 @@ impl MessagesReader { &self, indexes: IggyIndexesMut, ) -> Result<IggyMessagesBatchMut, IggyError> { + let _flock = self.flock.read().await; let file_size = self.file_size(); if file_size == 0 { return Ok(IggyMessagesBatchMut::empty()); diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs b/core/server/src/streaming/segments/messages/messages_writer.rs index b0e48badc..bf318f570 100644 --- a/core/server/src/streaming/segments/messages/messages_writer.rs +++ b/core/server/src/streaming/segments/messages/messages_writer.rs @@ -29,6 +29,7 @@ use tracing::trace; /// A dedicated struct for writing to the messages file. #[derive(Debug)] pub struct MessagesWriter { + flock: Rc<tokio::sync::RwLock<()>>, file_path: String, file: File, messages_size_bytes: Rc<AtomicU64>, @@ -49,6 +50,7 @@ impl MessagesWriter { messages_size_bytes: Rc<AtomicU64>, fsync: bool, file_exists: bool, + flock: Rc<tokio::sync::RwLock<()>>, ) -> Result<Self, IggyError> { let file = OpenOptions::new() .create(true) @@ -86,6 +88,7 @@ impl MessagesWriter { file, messages_size_bytes, fsync, + flock, }) } @@ -94,6 +97,7 @@ impl MessagesWriter { &self, batch_set: IggyMessagesBatchSet, ) -> Result<IggyByteSize, IggyError> { + let _flock = self.flock.write().await; let messages_size = batch_set.size(); let messages_count = batch_set.count(); let containers_count = batch_set.containers_count(); diff --git a/core/server/src/streaming/segments/storage.rs b/core/server/src/streaming/segments/storage.rs index 9b04d10bb..f3ebe4bfa 100644 --- a/core/server/src/streaming/segments/storage.rs +++ b/core/server/src/streaming/segments/storage.rs @@ -47,8 +47,16 @@ impl Storage { ) -> Result<Self, IggyError> { let size = Rc::new(AtomicU64::new(messages_size)); let indexes_size = Rc::new(AtomicU64::new(indexes_size)); + let flock = Rc::new(tokio::sync::RwLock::new(())); let messages_writer = Rc::new( - MessagesWriter::new(messages_path, size.clone(), log_fsync, file_exists).await?, + MessagesWriter::new( + messages_path, + size.clone(), + log_fsync, + file_exists, + flock.clone(), + ) + .await?, ); let index_writer = Rc::new( @@ -60,7 +68,7 @@ impl Storage { index_writer.fsync().await?; } - let messages_reader = Rc::new(MessagesReader::new(messages_path, size).await?); + let messages_reader = Rc::new(MessagesReader::new(messages_path, size, flock).await?); let index_reader = Rc::new(IndexReader::new(index_path, indexes_size).await?); Ok(Self { messages_writer: Some(messages_writer),
