Copilot commented on code in PR #2578:
URL: https://github.com/apache/iggy/pull/2578#discussion_r2699735490
##########
core/server/src/streaming/partitions/in_flight.rs:
##########
@@ -68,7 +70,28 @@ impl IggyMessagesBatchSetInFlight {
return &[];
}
- &self.batches
+ let start_idx = self
+ .batches
+ .iter()
+ .position(|b| {
+ b.last_offset()
+ .map(|last| last >= start_offset)
+ .unwrap_or(false)
+ })
+ .unwrap_or(0);
+
+ let end_idx = self
+ .batches
+ .iter()
+ .rposition(|b| {
+ b.first_offset()
+ .map(|first| first <= end_offset)
+ .unwrap_or(false)
+ })
+ .map(|i| i + 1)
+ .unwrap_or(self.batches.len());
Review Comment:
When `rposition()` returns `None` (no batch has `first_offset <=
end_offset`), defaulting to `self.batches.len()` is incorrect. This would
include all batches when none actually overlap with the requested range. The
function should return an empty slice in this case. Handle this explicitly
before the slice operation.
```suggestion
let end_idx = match self
.batches
.iter()
.rposition(|b| {
b.first_offset()
.map(|first| first <= end_offset)
.unwrap_or(false)
}) {
Some(i) => i + 1,
None => return &[],
};
```
##########
core/server/src/streaming/partitions/in_flight.rs:
##########
@@ -68,7 +70,28 @@ impl IggyMessagesBatchSetInFlight {
return &[];
}
- &self.batches
+ let start_idx = self
+ .batches
+ .iter()
+ .position(|b| {
+ b.last_offset()
+ .map(|last| last >= start_offset)
+ .unwrap_or(false)
+ })
+ .unwrap_or(0);
Review Comment:
When `position()` returns `None` (no batch has `last_offset >=
start_offset`), defaulting to index 0 is incorrect. This would include batches
before the start_offset that shouldn't be returned. The function should return
an empty slice in this case. Change to `.unwrap_or_else(|| return &[])` or
handle this case explicitly before the slice operation.
```suggestion
let start_idx = match self
.batches
.iter()
.position(|b| {
b.last_offset()
.map(|last| last >= start_offset)
.unwrap_or(false)
}) {
Some(idx) => idx,
None => return &[],
};
```
##########
core/integration/tests/server/scenarios/read_during_persistence_scenario.rs:
##########
@@ -96,21 +104,23 @@ async fn
should_read_messages_immediately_after_send_with_aggressive_persistence
let topic_id = Identifier::named(TOPIC_NAME).unwrap();
let consumer_kind = Consumer::default();
- let test_duration = Duration::from_secs(TEST_DURATION_SECS);
- let messages_per_batch = 32u32;
- let payload = "X".repeat(1024);
+ let test_duration = Duration::from_secs(10);
+ let messages_per_batch = 10u32;
+ let payload = "X".repeat(512);
let start = Instant::now();
let mut batches_sent = 0u64;
- let mut messages_sent = 0u64;
+ let mut next_offset = 0u64;
println!(
- "Starting test: 1KB messages, {} msgs/batch, duration: {}s",
- messages_per_batch, TEST_DURATION_SECS
+ "Starting background saver race test: {} msgs/batch, duration: {}s",
+ messages_per_batch,
+ test_duration.as_secs()
);
+ println!("Inline persistence DISABLED (high thresholds), background saver
every 100ms");
Review Comment:
The hardcoded value '100ms' in the message should reference the actual value
from the env_vars HashMap to avoid inconsistency if the configuration is
changed.
```suggestion
println!("Inline persistence DISABLED (high thresholds), background
saver enabled with configured interval");
```
##########
core/server/src/slab/streams.rs:
##########
@@ -1414,6 +1414,99 @@ impl Streams {
Ok(batch_count)
}
+ /// Persist frozen (immutable) batches to disk.
+ /// Assumes in-flight buffer is already set by caller.
+ /// Clears in-flight buffer after successful persist.
+ pub async fn persist_frozen_messages_to_disk(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ frozen_batches: &[IggyMessagesBatch],
+ config: &SystemConfig,
+ ) -> Result<u32, IggyError> {
+ let batch_count: u32 = frozen_batches.iter().map(|b| b.count()).sum();
+ let batch_size: u32 = frozen_batches.iter().map(|b| b.size()).sum();
Review Comment:
The frozen_batches slice is iterated twice to calculate batch_count and
batch_size. Consider using a single iteration with fold to compute both values
simultaneously, reducing iteration overhead.
```suggestion
let (batch_count, batch_size): (u32, u32) = frozen_batches
.iter()
.fold((0u32, 0u32), |(count_acc, size_acc), b| {
(count_acc + b.count(), size_acc + b.size())
});
```
##########
core/server/src/slab/streams.rs:
##########
@@ -1414,6 +1414,99 @@ impl Streams {
Ok(batch_count)
}
+ /// Persist frozen (immutable) batches to disk.
+ /// Assumes in-flight buffer is already set by caller.
+ /// Clears in-flight buffer after successful persist.
+ pub async fn persist_frozen_messages_to_disk(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ frozen_batches: &[IggyMessagesBatch],
+ config: &SystemConfig,
+ ) -> Result<u32, IggyError> {
+ let batch_count: u32 = frozen_batches.iter().map(|b| b.count()).sum();
+ let batch_size: u32 = frozen_batches.iter().map(|b| b.size()).sum();
+
+ if batch_count == 0 {
+ return Ok(0);
+ }
+
+ let has_segments =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.has_segments()
+ });
+
+ if !has_segments {
+ return Ok(0);
+ }
+
+ let (messages_writer, index_writer) =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ (
+ log.active_storage()
+ .messages_writer
+ .as_ref()
+ .expect("Messages writer not initialized")
+ .clone(),
+ log.active_storage()
+ .index_writer
+ .as_ref()
+ .expect("Index writer not initialized")
+ .clone(),
+ )
+ });
+ let guard = messages_writer.lock.lock().await;
+
+ let saved = messages_writer
+ .as_ref()
+ .save_frozen_batches(frozen_batches)
+ .await
+ .error(|e: &IggyError| {
+ format!(
+ "Failed to save batch of {batch_count} messages \
+ ({batch_size} bytes) to stream ID: {stream_id}, topic ID:
{topic_id}, partition ID: {partition_id}. {e}",
+ )
+ })?;
+
+ let unsaved_indexes_slice =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.active_indexes().unwrap().unsaved_slice()
+ });
+
+ let indexes_len = unsaved_indexes_slice.len();
+ index_writer
+ .as_ref()
+ .save_indexes(unsaved_indexes_slice)
+ .await
+ .error(|e: &IggyError| {
+ format!("Failed to save index of {indexes_len} indexes to
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
+ })?;
+
+ tracing::trace!(
+ "Persisted {} frozen messages on disk for stream ID: {}, topic ID:
{}, for partition with ID: {}, total bytes written: {}.",
+ batch_count,
+ stream_id,
+ topic_id,
+ partition_id,
+ saved
+ );
Review Comment:
Corrected 'Persisted' to match the convention used in
persist_messages_to_disk which says 'Persisted {} messages on disk' without the
word 'frozen'.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]