Copilot commented on code in PR #2578:
URL: https://github.com/apache/iggy/pull/2578#discussion_r2699735490


##########
core/server/src/streaming/partitions/in_flight.rs:
##########
@@ -68,7 +70,28 @@ impl IggyMessagesBatchSetInFlight {
             return &[];
         }
 
-        &self.batches
+        let start_idx = self
+            .batches
+            .iter()
+            .position(|b| {
+                b.last_offset()
+                    .map(|last| last >= start_offset)
+                    .unwrap_or(false)
+            })
+            .unwrap_or(0);
+
+        let end_idx = self
+            .batches
+            .iter()
+            .rposition(|b| {
+                b.first_offset()
+                    .map(|first| first <= end_offset)
+                    .unwrap_or(false)
+            })
+            .map(|i| i + 1)
+            .unwrap_or(self.batches.len());

Review Comment:
   When `rposition()` returns `None` (no batch has `first_offset <= 
end_offset`), defaulting to `self.batches.len()` is incorrect. This would 
include all batches when none actually overlap with the requested range. The 
function should return an empty slice in this case. Handle this explicitly 
before the slice operation.
   ```suggestion
           let end_idx = match self
               .batches
               .iter()
               .rposition(|b| {
                   b.first_offset()
                       .map(|first| first <= end_offset)
                       .unwrap_or(false)
               }) {
               Some(i) => i + 1,
               None => return &[],
           };
   ```



##########
core/server/src/streaming/partitions/in_flight.rs:
##########
@@ -68,7 +70,28 @@ impl IggyMessagesBatchSetInFlight {
             return &[];
         }
 
-        &self.batches
+        let start_idx = self
+            .batches
+            .iter()
+            .position(|b| {
+                b.last_offset()
+                    .map(|last| last >= start_offset)
+                    .unwrap_or(false)
+            })
+            .unwrap_or(0);

Review Comment:
   When `position()` returns `None` (no batch has `last_offset >= 
start_offset`), defaulting to index 0 is incorrect. This would include batches 
before the start_offset that shouldn't be returned. The function should return 
an empty slice in this case. Change to `.unwrap_or_else(|| return &[])` or 
handle this case explicitly before the slice operation.
   ```suggestion
           let start_idx = match self
               .batches
               .iter()
               .position(|b| {
                   b.last_offset()
                       .map(|last| last >= start_offset)
                       .unwrap_or(false)
               }) {
               Some(idx) => idx,
               None => return &[],
           };
   ```



##########
core/integration/tests/server/scenarios/read_during_persistence_scenario.rs:
##########
@@ -96,21 +104,23 @@ async fn 
should_read_messages_immediately_after_send_with_aggressive_persistence
     let topic_id = Identifier::named(TOPIC_NAME).unwrap();
     let consumer_kind = Consumer::default();
 
-    let test_duration = Duration::from_secs(TEST_DURATION_SECS);
-    let messages_per_batch = 32u32;
-    let payload = "X".repeat(1024);
+    let test_duration = Duration::from_secs(10);
+    let messages_per_batch = 10u32;
+    let payload = "X".repeat(512);
 
     let start = Instant::now();
     let mut batches_sent = 0u64;
-    let mut messages_sent = 0u64;
+    let mut next_offset = 0u64;
 
     println!(
-        "Starting test: 1KB messages, {} msgs/batch, duration: {}s",
-        messages_per_batch, TEST_DURATION_SECS
+        "Starting background saver race test: {} msgs/batch, duration: {}s",
+        messages_per_batch,
+        test_duration.as_secs()
     );
+    println!("Inline persistence DISABLED (high thresholds), background saver 
every 100ms");

Review Comment:
   The hardcoded value '100ms' in the message should reference the actual value 
from the env_vars HashMap to avoid inconsistency if the configuration is 
changed.
   ```suggestion
       println!("Inline persistence DISABLED (high thresholds), background 
saver enabled with configured interval");
   ```



##########
core/server/src/slab/streams.rs:
##########
@@ -1414,6 +1414,99 @@ impl Streams {
         Ok(batch_count)
     }
 
+    /// Persist frozen (immutable) batches to disk.
+    /// Assumes in-flight buffer is already set by caller.
+    /// Clears in-flight buffer after successful persist.
+    pub async fn persist_frozen_messages_to_disk(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: usize,
+        frozen_batches: &[IggyMessagesBatch],
+        config: &SystemConfig,
+    ) -> Result<u32, IggyError> {
+        let batch_count: u32 = frozen_batches.iter().map(|b| b.count()).sum();
+        let batch_size: u32 = frozen_batches.iter().map(|b| b.size()).sum();

Review Comment:
   The frozen_batches slice is iterated twice to calculate batch_count and 
batch_size. Consider using a single iteration with fold to compute both values 
simultaneously, reducing iteration overhead.
   ```suggestion
           let (batch_count, batch_size): (u32, u32) = frozen_batches
               .iter()
               .fold((0u32, 0u32), |(count_acc, size_acc), b| {
                   (count_acc + b.count(), size_acc + b.size())
               });
   ```



##########
core/server/src/slab/streams.rs:
##########
@@ -1414,6 +1414,99 @@ impl Streams {
         Ok(batch_count)
     }
 
+    /// Persist frozen (immutable) batches to disk.
+    /// Assumes in-flight buffer is already set by caller.
+    /// Clears in-flight buffer after successful persist.
+    pub async fn persist_frozen_messages_to_disk(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: usize,
+        frozen_batches: &[IggyMessagesBatch],
+        config: &SystemConfig,
+    ) -> Result<u32, IggyError> {
+        let batch_count: u32 = frozen_batches.iter().map(|b| b.count()).sum();
+        let batch_size: u32 = frozen_batches.iter().map(|b| b.size()).sum();
+
+        if batch_count == 0 {
+            return Ok(0);
+        }
+
+        let has_segments =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                log.has_segments()
+            });
+
+        if !has_segments {
+            return Ok(0);
+        }
+
+        let (messages_writer, index_writer) =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                (
+                    log.active_storage()
+                        .messages_writer
+                        .as_ref()
+                        .expect("Messages writer not initialized")
+                        .clone(),
+                    log.active_storage()
+                        .index_writer
+                        .as_ref()
+                        .expect("Index writer not initialized")
+                        .clone(),
+                )
+            });
+        let guard = messages_writer.lock.lock().await;
+
+        let saved = messages_writer
+            .as_ref()
+            .save_frozen_batches(frozen_batches)
+            .await
+            .error(|e: &IggyError| {
+                format!(
+                    "Failed to save batch of {batch_count} messages \
+                    ({batch_size} bytes) to stream ID: {stream_id}, topic ID: 
{topic_id}, partition ID: {partition_id}. {e}",
+                )
+            })?;
+
+        let unsaved_indexes_slice =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                log.active_indexes().unwrap().unsaved_slice()
+            });
+
+        let indexes_len = unsaved_indexes_slice.len();
+        index_writer
+            .as_ref()
+            .save_indexes(unsaved_indexes_slice)
+            .await
+            .error(|e: &IggyError| {
+                format!("Failed to save index of {indexes_len} indexes to 
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
+            })?;
+
+        tracing::trace!(
+            "Persisted {} frozen messages on disk for stream ID: {}, topic ID: 
{}, for partition with ID: {}, total bytes written: {}.",
+            batch_count,
+            stream_id,
+            topic_id,
+            partition_id,
+            saved
+        );

Review Comment:
   Corrected 'Persisted' to match the convention used in 
persist_messages_to_disk which says 'Persisted {} messages on disk' without the 
word 'frozen'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to