This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch fix_concurrency_issue_with_mesage_saver_task
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit fb5fb422171b01f04971ffd5d1b7f26cb8dbed5c
Author: numminex <[email protected]>
AuthorDate: Thu Nov 20 17:39:32 2025 +0100

    fix(server): fix concurrency bug with message saver
---
 core/metadata/src/impls/metadata.rs                    | 18 ++++++++++++++++--
 .../src/streaming/segments/messages/messages_reader.rs |  4 ++++
 .../src/streaming/segments/messages/messages_writer.rs |  4 ++++
 core/server/src/streaming/segments/storage.rs          | 12 ++++++++++--
 4 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index f802d53f1..93a1a4e49 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -65,5 +65,19 @@ where
 }
 
 // TODO: Hide with associated types all of those generics, so they are not 
leaking to the upper layer
-#[expect(unused)]
-pub trait MetadataHandle {}
+// Something like this:
+// pub trait MetadataHandle {
+//     type Consensus: Consensus<Self::Clock>;
+//     type Clock: Clock;
+//     type MuxStm;
+//     type Journal;
+//     type Snapshot;
+// }
+
+// pub trait Metadata<H: MetadataHandle> {
+//     fn on_request(&self, message: <H::Consensus as 
Consensus<H::Clock>>::RequestMessage); // Create type aliases for those long 
associated types
+//     fn on_replicate(&self, message: <H::Consensus as 
Consensus<H::Clock>>::ReplicateMessage);
+//     fn on_ack(&self, message: <H::Consensus as 
Consensus<H::Clock>>::AckMessage);
+// }
+
+// The error messages can get ugly from those associated types, but I think 
it's worth the fact that it hides a lot of the generics and their bounds.
diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs 
b/core/server/src/streaming/segments/messages/messages_reader.rs
index 3a419947b..9f4335b9a 100644
--- a/core/server/src/streaming/segments/messages/messages_reader.rs
+++ b/core/server/src/streaming/segments/messages/messages_reader.rs
@@ -34,6 +34,7 @@ use tracing::{error, trace};
 /// A dedicated struct for reading from the messages file.
 #[derive(Debug)]
 pub struct MessagesReader {
+    flock: Rc<tokio::sync::RwLock<()>>,
     file_path: String,
     file: File,
     messages_size_bytes: Rc<AtomicU64>,
@@ -47,6 +48,7 @@ impl MessagesReader {
     pub async fn new(
         file_path: &str,
         messages_size_bytes: Rc<AtomicU64>,
+        flock: Rc<tokio::sync::RwLock<()>>,
     ) -> Result<Self, IggyError> {
         let file = OpenOptions::new()
             .read(true)
@@ -81,6 +83,7 @@ impl MessagesReader {
             file_path: file_path.to_string(),
             file,
             messages_size_bytes,
+            flock,
         })
     }
 
@@ -135,6 +138,7 @@ impl MessagesReader {
         &self,
         indexes: IggyIndexesMut,
     ) -> Result<IggyMessagesBatchMut, IggyError> {
+        let _flock = self.flock.read().await;
         let file_size = self.file_size();
         if file_size == 0 {
             return Ok(IggyMessagesBatchMut::empty());
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index b0e48badc..bf318f570 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -29,6 +29,7 @@ use tracing::trace;
 /// A dedicated struct for writing to the messages file.
 #[derive(Debug)]
 pub struct MessagesWriter {
+    flock: Rc<tokio::sync::RwLock<()>>,
     file_path: String,
     file: File,
     messages_size_bytes: Rc<AtomicU64>,
@@ -49,6 +50,7 @@ impl MessagesWriter {
         messages_size_bytes: Rc<AtomicU64>,
         fsync: bool,
         file_exists: bool,
+        flock: Rc<tokio::sync::RwLock<()>>,
     ) -> Result<Self, IggyError> {
         let file = OpenOptions::new()
             .create(true)
@@ -86,6 +88,7 @@ impl MessagesWriter {
             file,
             messages_size_bytes,
             fsync,
+            flock,
         })
     }
 
@@ -94,6 +97,7 @@ impl MessagesWriter {
         &self,
         batch_set: IggyMessagesBatchSet,
     ) -> Result<IggyByteSize, IggyError> {
+        let _flock = self.flock.write().await;
         let messages_size = batch_set.size();
         let messages_count = batch_set.count();
         let containers_count = batch_set.containers_count();
diff --git a/core/server/src/streaming/segments/storage.rs 
b/core/server/src/streaming/segments/storage.rs
index 9b04d10bb..f3ebe4bfa 100644
--- a/core/server/src/streaming/segments/storage.rs
+++ b/core/server/src/streaming/segments/storage.rs
@@ -47,8 +47,16 @@ impl Storage {
     ) -> Result<Self, IggyError> {
         let size = Rc::new(AtomicU64::new(messages_size));
         let indexes_size = Rc::new(AtomicU64::new(indexes_size));
+        let flock = Rc::new(tokio::sync::RwLock::new(()));
         let messages_writer = Rc::new(
-            MessagesWriter::new(messages_path, size.clone(), log_fsync, 
file_exists).await?,
+            MessagesWriter::new(
+                messages_path,
+                size.clone(),
+                log_fsync,
+                file_exists,
+                flock.clone(),
+            )
+            .await?,
         );
 
         let index_writer = Rc::new(
@@ -60,7 +68,7 @@ impl Storage {
             index_writer.fsync().await?;
         }
 
-        let messages_reader = Rc::new(MessagesReader::new(messages_path, 
size).await?);
+        let messages_reader = Rc::new(MessagesReader::new(messages_path, size, 
flock).await?);
         let index_reader = Rc::new(IndexReader::new(index_path, 
indexes_size).await?);
         Ok(Self {
             messages_writer: Some(messages_writer),

Reply via email to