This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 202fda739 fix(server): properly cleanup messages in message cleaner
(#2670)
202fda739 is described below
commit 202fda7396b1500512403e4c7ddda3ba52b6875e
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Feb 4 16:57:52 2026 +0100
fix(server): properly cleanup messages in message cleaner (#2670)
Segments created after server startup were using the server's
default message_expiry (NeverExpire) rather than the topic's
configured value, causing messages to never be cleaned up even
when expiry was set.
- Propagate topic's message_expiry to all segment creation sites
- Move message_expiry config from [system.segment] to [system.topic]
- Remove delete_oldest_segments toggle
- Seal segments on rotation so is_expired() can evaluate them
- Add comprehensive tests for time-based, size-based, and combined
Closes #2629.
---------
Co-authored-by: Grzegorz Koszyk
<[email protected]>
---
core/common/src/utils/duration.rs | 2 +-
core/integration/tests/config_provider/mod.rs | 9 +-
core/integration/tests/server/message_cleanup.rs | 124 +++++
core/integration/tests/server/mod.rs | 1 +
.../server/scenarios/message_cleanup_scenario.rs | 588 +++++++++++++++++++++
core/integration/tests/server/scenarios/mod.rs | 1 +
core/integration/tests/server/specific.rs | 6 -
core/server/config.toml | 66 +--
core/server/src/bootstrap.rs | 19 +-
core/server/src/configs/defaults.rs | 3 +-
core/server/src/configs/displays.rs | 8 +-
core/server/src/configs/system.rs | 9 +-
core/server/src/configs/validators.rs | 110 ++--
core/server/src/shard/system/messages.rs | 9 +-
core/server/src/shard/system/partitions.rs | 6 +-
core/server/src/shard/system/segments.rs | 27 +-
.../src/shard/tasks/periodic/message_cleaner.rs | 224 +++++---
core/server/src/streaming/segments/segment.rs | 31 +-
18 files changed, 974 insertions(+), 269 deletions(-)
diff --git a/core/common/src/utils/duration.rs
b/core/common/src/utils/duration.rs
index aa6c796a7..f8c8de92e 100644
--- a/core/common/src/utils/duration.rs
+++ b/core/common/src/utils/duration.rs
@@ -101,7 +101,7 @@ impl IggyDuration {
}
pub fn is_zero(&self) -> bool {
- self.duration.as_secs() == 0
+ self.duration.is_zero()
}
pub fn abs_diff(&self, other: IggyDuration) -> IggyDuration {
diff --git a/core/integration/tests/config_provider/mod.rs
b/core/integration/tests/config_provider/mod.rs
index d96ba5fc7..adf67eac5 100644
--- a/core/integration/tests/config_provider/mod.rs
+++ b/core/integration/tests/config_provider/mod.rs
@@ -43,10 +43,7 @@ async fn validate_config_env_override() {
"IGGY_MESSAGE_SAVER_ENABLED",
expected_message_saver.to_string(),
);
- env::set_var(
- "IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY",
- expected_message_expiry,
- );
+ env::set_var("IGGY_SYSTEM_TOPIC_MESSAGE_EXPIRY",
expected_message_expiry);
}
let config_path = get_root_path().join("../server/config.toml");
@@ -61,7 +58,7 @@ async fn validate_config_env_override() {
assert_eq!(config.tcp.enabled, expected_tcp);
assert_eq!(config.message_saver.enabled, expected_message_saver);
assert_eq!(
- config.system.segment.message_expiry.to_string(),
+ config.system.topic.message_expiry.to_string(),
expected_message_expiry
);
@@ -69,7 +66,7 @@ async fn validate_config_env_override() {
env::remove_var("IGGY_HTTP_ENABLED");
env::remove_var("IGGY_TCP_ENABLED");
env::remove_var("IGGY_MESSAGE_SAVER_ENABLED");
- env::remove_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY");
+ env::remove_var("IGGY_SYSTEM_TOPIC_MESSAGE_EXPIRY");
}
}
diff --git a/core/integration/tests/server/message_cleanup.rs
b/core/integration/tests/server/message_cleanup.rs
new file mode 100644
index 000000000..c61855d83
--- /dev/null
+++ b/core/integration/tests/server/message_cleanup.rs
@@ -0,0 +1,124 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::message_cleanup_scenario;
+use iggy::prelude::*;
+use integration::harness::{TestHarness, TestServerConfig};
+use serial_test::parallel;
+use std::collections::HashMap;
+use std::future::Future;
+use std::path::Path;
+use std::pin::Pin;
+use test_case::test_matrix;
+
+type CleanupScenarioFn =
+ for<'a> fn(&'a IggyClient, &'a Path) -> Pin<Box<dyn Future<Output = ()> +
'a>>;
+
+fn expiry_after_rotation() -> CleanupScenarioFn {
+ |client, path| {
+ Box::pin(message_cleanup_scenario::run_expiry_after_rotation(
+ client, path,
+ ))
+ }
+}
+
+fn active_segment_protection() -> CleanupScenarioFn {
+ |client, path| {
+ Box::pin(message_cleanup_scenario::run_active_segment_protection(
+ client, path,
+ ))
+ }
+}
+
+fn size_based_retention() -> CleanupScenarioFn {
+ |client, path| {
+ Box::pin(message_cleanup_scenario::run_size_based_retention(
+ client, path,
+ ))
+ }
+}
+
+fn combined_retention() -> CleanupScenarioFn {
+ |client, path| {
+ Box::pin(message_cleanup_scenario::run_combined_retention(
+ client, path,
+ ))
+ }
+}
+
+fn expiry_multipartition() -> CleanupScenarioFn {
+ |client, path| {
+
Box::pin(message_cleanup_scenario::run_expiry_with_multiple_partitions(client,
path))
+ }
+}
+
+fn fair_size_cleanup_multipartition() -> CleanupScenarioFn {
+ |client, path| {
+
Box::pin(message_cleanup_scenario::run_fair_size_based_cleanup_multipartition(client,
path))
+ }
+}
+
+async fn run_cleanup_scenario(scenario: CleanupScenarioFn) {
+ let mut harness = TestHarness::builder()
+ .server(
+ TestServerConfig::builder()
+ .extra_envs(HashMap::from([
+ ("IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
"100KiB".to_string()),
+ (
+
"IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED".to_string(),
+ "true".to_string(),
+ ),
+ (
+ "IGGY_DATA_MAINTENANCE_MESSAGES_INTERVAL".to_string(),
+ "100ms".to_string(),
+ ),
+ (
+
"IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
+ "1".to_string(),
+ ),
+ (
+ "IGGY_SYSTEM_PARTITION_ENFORCE_FSYNC".to_string(),
+ "true".to_string(),
+ ),
+ ]))
+ .build(),
+ )
+ .build()
+ .unwrap();
+
+ harness.start().await.unwrap();
+
+ let client = harness.tcp_root_client().await.unwrap();
+ let data_path = harness.server().data_path();
+
+ scenario(&client, &data_path).await;
+}
+
+#[test_matrix([
+ expiry_after_rotation(),
+ active_segment_protection(),
+ size_based_retention(),
+ combined_retention(),
+ expiry_multipartition(),
+ fair_size_cleanup_multipartition(),
+])]
+#[tokio::test]
+#[parallel]
+async fn message_cleanup(scenario: CleanupScenarioFn) {
+ run_cleanup_scenario(scenario).await;
+}
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index de58d27cc..376746436 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -19,6 +19,7 @@
mod cg;
mod concurrent_addition;
mod general;
+mod message_cleanup;
mod message_retrieval;
mod scenarios;
mod specific;
diff --git
a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
new file mode 100644
index 000000000..066953153
--- /dev/null
+++ b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
@@ -0,0 +1,588 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for message retention policies (time-based and size-based).
+//!
+//! Configuration: 100KB segment size, 100ms cleaner interval, instant flush.
+//! Message size: 64B header + 936B payload = 1KB per message.
+//! Therefore: 100 messages = 1 segment, 101+ messages = 2+ segments.
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use iggy_common::IggyByteSize;
+use std::fs::{DirEntry, read_dir};
+use std::path::Path;
+use std::time::Duration;
+
+const STREAM_NAME: &str = "test_expiry_stream";
+const TOPIC_NAME: &str = "test_expiry_topic";
+const PARTITION_ID: u32 = 0;
+const LOG_EXTENSION: &str = "log";
+
+/// Payload size chosen so that header (64B) + payload = 1KB per message.
+const PAYLOAD_SIZE: usize = 936;
+
+/// Buffer time for cleaner to run after expiry conditions are met.
+const CLEANER_BUFFER: Duration = Duration::from_millis(300);
+
+fn make_payload(fill: char) -> Bytes {
+ Bytes::from(fill.to_string().repeat(PAYLOAD_SIZE))
+}
+
+/// Tests time-based retention: segments are cleaned up after expiry.
+pub async fn run_expiry_after_rotation(client: &IggyClient, data_path: &Path) {
+ let stream = client.create_stream(STREAM_NAME).await.unwrap();
+ let stream_id = stream.id;
+
+ let expiry = Duration::from_secs(2);
+ let topic = client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::ExpireDuration(IggyDuration::from(expiry)),
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+ let topic_id = topic.id;
+
+ let partition_path = data_path
+ .join(format!(
+ "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
+ ))
+ .display()
+ .to_string();
+
+ // Send 110 messages (1KB each) to create 2 segments (100KB segment size)
+ let payload = make_payload('A');
+ let total_messages = 110;
+
+ for i in 0..total_messages {
+ let message = IggyMessage::builder()
+ .id(i as u128)
+ .payload(payload.clone())
+ .build()
+ .unwrap();
+
+ let mut messages = vec![message];
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+ }
+
+ let initial_segments = get_segment_paths_for_partition(&partition_path);
+ let initial_count = initial_segments.len();
+ assert!(
+ initial_count >= 2,
+ "Expected at least 2 segments but got {}",
+ initial_count
+ );
+
+ // Verify we can poll all messages before expiry
+ let polled_before = client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+ &PollingStrategy::offset(0),
+ total_messages as u32,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled_before.messages.len(),
+ total_messages as usize,
+ "Should poll all messages before expiry"
+ );
+
+ // Wait for expiry + cleaner
+ tokio::time::sleep(expiry + CLEANER_BUFFER).await;
+
+ let remaining_segments = get_segment_paths_for_partition(&partition_path);
+ assert!(
+ remaining_segments.len() < initial_count,
+ "Expected segments to be deleted after expiry"
+ );
+ assert!(
+ !remaining_segments.is_empty(),
+ "Active segment should not be deleted"
+ );
+
+ // Verify fewer messages available after cleanup
+ let polled_after = client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+ &PollingStrategy::offset(0),
+ total_messages as u32,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert!(
+ polled_after.messages.len() < polled_before.messages.len(),
+ "Expected fewer messages after cleanup"
+ );
+
+ client
+ .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await
+ .unwrap();
+}
+
+/// Tests that the active segment is never deleted, even if expired.
+pub async fn run_active_segment_protection(client: &IggyClient, data_path:
&Path) {
+ let stream = client.create_stream(STREAM_NAME).await.unwrap();
+ let stream_id = stream.id;
+
+ let expiry = Duration::from_secs(1);
+ let topic = client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::ExpireDuration(IggyDuration::from(expiry)),
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+ let topic_id = topic.id;
+
+ let partition_path = data_path
+ .join(format!(
+ "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
+ ))
+ .display()
+ .to_string();
+
+ // Send one small message (stays in active segment, no rotation)
+ let message = IggyMessage::builder()
+ .id(1u128)
+ .payload(Bytes::from("small"))
+ .build()
+ .unwrap();
+
+ let mut messages = vec![message];
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+
+ let initial_segments = get_segment_paths_for_partition(&partition_path);
+ assert_eq!(initial_segments.len(), 1, "Should have exactly 1 segment");
+
+ // Wait for expiry + cleaner
+ tokio::time::sleep(expiry + CLEANER_BUFFER).await;
+
+ let remaining_segments = get_segment_paths_for_partition(&partition_path);
+ assert_eq!(
+ remaining_segments.len(),
+ 1,
+ "Active segment should NOT be deleted even after expiry"
+ );
+
+ client
+ .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await
+ .unwrap();
+}
+
+/// Tests size-based retention: oldest segments deleted when topic exceeds
max_size.
+pub async fn run_size_based_retention(client: &IggyClient, data_path: &Path) {
+ let stream = client.create_stream(STREAM_NAME).await.unwrap();
+ let stream_id = stream.id;
+
+ // 150KB max, cleanup at 90% = 135KB. With 100KB segments, exceeding 135KB
triggers cleanup.
+ let max_size_bytes = 150 * 1024;
+ let topic = client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::Custom(IggyByteSize::from(max_size_bytes)),
+ )
+ .await
+ .unwrap();
+ let topic_id = topic.id;
+
+ let partition_path = data_path
+ .join(format!(
+ "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
+ ))
+ .display()
+ .to_string();
+
+ // Send 160 messages (160KB) to exceed 90% threshold (135KB)
+ let payload = make_payload('B');
+ let total_messages = 160;
+
+ for i in 0..total_messages {
+ let message = IggyMessage::builder()
+ .id(i as u128)
+ .payload(payload.clone())
+ .build()
+ .unwrap();
+
+ let mut messages = vec![message];
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+ }
+
+ // Wait for cleaner
+ tokio::time::sleep(CLEANER_BUFFER).await;
+
+ let remaining_segments = get_segment_paths_for_partition(&partition_path);
+
+ // Verify oldest messages deleted (first offset > 0)
+ let polled = client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+ &PollingStrategy::offset(0),
+ total_messages as u32,
+ false,
+ )
+ .await
+ .unwrap();
+
+ let first_offset = polled
+ .messages
+ .first()
+ .map(|m| m.header.offset)
+ .unwrap_or(0);
+
+ assert!(
+ first_offset > 0,
+ "Oldest messages should be deleted, first_offset should be > 0"
+ );
+ assert!(
+ polled.messages.len() < total_messages as usize,
+ "Some messages should be deleted"
+ );
+ assert!(
+ !remaining_segments.is_empty(),
+ "Active segment should not be deleted"
+ );
+
+ client
+ .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await
+ .unwrap();
+}
+
+/// Tests both retention policies together: time-based AND size-based.
+pub async fn run_combined_retention(client: &IggyClient, data_path: &Path) {
+ let stream = client.create_stream(STREAM_NAME).await.unwrap();
+ let stream_id = stream.id;
+
+ let expiry = Duration::from_secs(2);
+ let topic = client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::ExpireDuration(IggyDuration::from(expiry)),
+ MaxTopicSize::Custom(IggyByteSize::from(500 * 1024)), // 500KB
(won't trigger)
+ )
+ .await
+ .unwrap();
+ let topic_id = topic.id;
+
+ let partition_path = data_path
+ .join(format!(
+ "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
+ ))
+ .display()
+ .to_string();
+
+ // Send 110 messages to create 2 segments (under size threshold, but will
expire)
+ let payload = make_payload('C');
+ for i in 0..110 {
+ let message = IggyMessage::builder()
+ .id(i as u128)
+ .payload(payload.clone())
+ .build()
+ .unwrap();
+
+ let mut messages = vec![message];
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+ }
+
+ let initial_segments = get_segment_paths_for_partition(&partition_path);
+ let initial_count = initial_segments.len();
+ assert!(initial_count >= 2, "Expected at least 2 segments");
+
+ // Wait for time-based expiry
+ tokio::time::sleep(expiry + CLEANER_BUFFER).await;
+
+ let remaining_segments = get_segment_paths_for_partition(&partition_path);
+ assert!(
+ remaining_segments.len() < initial_count,
+ "Segments should be deleted after expiry"
+ );
+ assert!(
+ !remaining_segments.is_empty(),
+ "Active segment should not be deleted"
+ );
+
+ client
+ .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await
+ .unwrap();
+}
+
+/// Tests time-based retention with multiple partitions.
+pub async fn run_expiry_with_multiple_partitions(client: &IggyClient,
data_path: &Path) {
+ const PARTITIONS_COUNT: u32 = 3;
+
+ let stream = client.create_stream(STREAM_NAME).await.unwrap();
+ let stream_id = stream.id;
+
+ let expiry = Duration::from_secs(3);
+ let topic = client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ PARTITIONS_COUNT,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::ExpireDuration(IggyDuration::from(expiry)),
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+ let topic_id = topic.id;
+
+ let payload = make_payload('D');
+ let messages_per_partition = 110;
+
+ // Send messages to all partitions
+ for partition_id in 0..PARTITIONS_COUNT {
+ for i in 0..messages_per_partition {
+ let msg_id = partition_id as u128 * 1000 + i as u128;
+ let message = IggyMessage::builder()
+ .id(msg_id)
+ .payload(payload.clone())
+ .build()
+ .unwrap();
+
+ let mut messages = vec![message];
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(partition_id),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+ }
+ }
+
+ // Collect initial segment counts
+ let mut initial_counts: Vec<usize> = Vec::new();
+ for partition_id in 0..PARTITIONS_COUNT {
+ let partition_path = data_path
+ .join(format!(
+
"streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}"
+ ))
+ .display()
+ .to_string();
+ let segments = get_segment_paths_for_partition(&partition_path);
+ initial_counts.push(segments.len());
+ assert!(
+ segments.len() >= 2,
+ "Partition {} should have at least 2 segments, got {}",
+ partition_id,
+ segments.len()
+ );
+ }
+
+ // Wait for expiry + cleaner
+ tokio::time::sleep(expiry + CLEANER_BUFFER).await;
+
+ // Verify cleanup in all partitions
+ let mut total_deleted = 0usize;
+ for partition_id in 0..PARTITIONS_COUNT {
+ let partition_path = data_path
+ .join(format!(
+
"streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}"
+ ))
+ .display()
+ .to_string();
+ let remaining = get_segment_paths_for_partition(&partition_path);
+ let deleted = initial_counts[partition_id as
usize].saturating_sub(remaining.len());
+ total_deleted += deleted;
+
+ assert!(
+ !remaining.is_empty(),
+ "Partition {} should retain active segment",
+ partition_id
+ );
+ }
+
+ assert!(
+ total_deleted > 0,
+ "At least some segments should be deleted"
+ );
+
+ client
+ .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await
+ .unwrap();
+}
+
+/// Tests fair size-based cleanup across multiple partitions.
+pub async fn run_fair_size_based_cleanup_multipartition(client: &IggyClient,
data_path: &Path) {
+ const PARTITIONS_COUNT: u32 = 3;
+
+ let stream = client.create_stream(STREAM_NAME).await.unwrap();
+ let stream_id = stream.id;
+
+ // 200KB max, cleanup at 90% = 180KB
+ let max_size_bytes = 200 * 1024;
+ let topic = client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ PARTITIONS_COUNT,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::Custom(IggyByteSize::from(max_size_bytes)),
+ )
+ .await
+ .unwrap();
+ let topic_id = topic.id;
+
+ let payload = make_payload('E');
+
+ // Send 70 messages per partition = 210KB total, exceeds 180KB threshold
+ for partition_id in 0..PARTITIONS_COUNT {
+ for i in 0..70 {
+ let msg_id = partition_id as u128 * 1000 + i as u128;
+ let message = IggyMessage::builder()
+ .id(msg_id)
+ .payload(payload.clone())
+ .build()
+ .unwrap();
+
+ let mut messages = vec![message];
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(partition_id),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+ }
+ }
+
+ // Wait for cleaner
+ tokio::time::sleep(CLEANER_BUFFER).await;
+
+ // Verify segments exist for all partitions
+ for partition_id in 0..PARTITIONS_COUNT {
+ let partition_path = data_path
+ .join(format!(
+
"streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}"
+ ))
+ .display()
+ .to_string();
+ let segments = get_segment_paths_for_partition(&partition_path);
+ assert!(
+ !segments.is_empty(),
+ "Partition {} should have at least 1 segment",
+ partition_id
+ );
+ }
+
+ client
+ .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await
+ .unwrap();
+}
+
+fn get_segment_paths_for_partition(partition_path: &str) -> Vec<DirEntry> {
+ read_dir(partition_path)
+ .map(|read_dir| {
+ read_dir
+ .filter_map(|dir_entry| {
+ dir_entry
+ .map(|dir_entry| {
+ match dir_entry
+ .path()
+ .extension()
+ .is_some_and(|ext| ext == LOG_EXTENSION)
+ {
+ true => Some(dir_entry),
+ false => None,
+ }
+ })
+ .ok()
+ .flatten()
+ })
+ .collect::<Vec<_>>()
+ })
+ .unwrap_or_default()
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index b04866f37..90d45676f 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -30,6 +30,7 @@ pub mod cross_protocol_pat_scenario;
pub mod delete_segments_scenario;
pub mod encryption_scenario;
pub mod log_rotation_scenario;
+pub mod message_cleanup_scenario;
pub mod message_headers_scenario;
pub mod message_size_scenario;
pub mod offset_scenario;
diff --git a/core/integration/tests/server/specific.rs
b/core/integration/tests/server/specific.rs
index afd4a3a0c..501099a83 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -241,28 +241,22 @@ async fn
should_handle_single_message_per_batch_with_delayed_persistence() {
async fn segment_rotation_scenario() {
let mut extra_envs = HashMap::new();
- // Very small segment to trigger frequent rotations
extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
"512B".to_string());
- // Short message saver interval to add concurrent persist operations
extra_envs.insert("IGGY_MESSAGE_SAVER_INTERVAL".to_string(),
"1s".to_string());
- // Small threshold to trigger more frequent saves
extra_envs.insert(
"IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
"32".to_string(),
);
- // cache_indexes = none triggers clear_active_indexes in
handle_full_segment
extra_envs.insert(
"IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(),
"none".to_string(),
);
- // Disable socket migration to keep all connections on same shard
extra_envs.insert("IGGY_TCP_SOCKET_MIGRATION".to_string(),
"false".to_string());
- // Enable TCP nodelay for faster message throughput
extra_envs.insert(
"IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
"true".to_string(),
diff --git a/core/server/config.toml b/core/server/config.toml
index 3949b9cd5..372d9da94 100644
--- a/core/server/config.toml
+++ b/core/server/config.toml
@@ -415,25 +415,28 @@ default_algorithm = "none"
# Specifies the directory where stream data is stored, relative to
`system.path`.
path = "streams"
-# Topic configuration
+# Topic configuration - default settings for new topics
[system.topic]
-# Path for storing topic-related data (string).
-# Specifies the directory where topic data is stored, relative to
`stream.path`.
+# Path for storing topic-related data, relative to `stream.path`.
path = "topics"
-# Configures the topic size-based expiry setting.
-# "unlimited" or "0" means topics are kept indefinitely.
-# A size value in human-readable format determines the maximum size of a topic.
-# When a topic reaches this size, the oldest messages are deleted to make room
for new ones.
-# Messages are removed in full segments, so if segment size is 1 GiB and the
topic size is 10 GiB,
-# the oldest segment will be deleted upon reaching 10 GiB.
-# Example: `max_topic_size = "10 GiB"` means oldest messages in topics will be
deleted when they reach 10 GiB.
-# Note: this setting can be overwritten with CreateTopic and UpdateTopic
requests.
+# Messages can be deleted based on two independent policies:
+# 1. Size-based: delete oldest segments when topic exceeds max_size
+# 2. Time-based: delete segments older than message_expiry
+# Both can be active simultaneously. Per-topic overrides via
CreateTopic/UpdateTopic.
+
+# Maximum topic size before oldest segments are deleted.
+# "unlimited" or "0" = no size limit (messages kept indefinitely).
+# When 90% of this limit is reached, oldest segments are removed to make room.
+# Applies to sealed segments only (active segment is protected).
+# Example: "10 GiB"
max_size = "unlimited"
-# Configures whether the oldest segments are deleted when a topic reaches its
maximum size (boolean).
-# Note: segments are removed in intervals defined by
`system.message_cleaner.interval`.
-delete_oldest_segments = false
+# Maximum age of messages before segments are deleted.
+# "none" = no time limit (messages kept indefinitely).
+# Applies to sealed segments only (active segment is protected).
+# Example: "7 days", "2 days 4 hours 15 minutes"
+message_expiry = "none"
# Partition configuration
[system.partition]
@@ -451,23 +454,15 @@ enforce_fsync = false
# `false` skips these checks for faster loading at the risk of undetected
corruption.
validate_checksum = false
-# The count threshold of buffered messages before triggering a save to disk
(integer).
-# Specifies how many messages accumulate before persisting to storage.
-# Adjusting this can balance between write performance and data durability.
-# This is soft limit, actual number of messages may be higher, depending on
last batch size.
-# Together with `size_of_messages_required_to_save` it defines the threshold
of buffered messages.
-# Minimum value is 32. Value has to be a multiple of 32 due to minimum
-# direct I/O block size (512 bytes) and message index size (16 bytes per
message).
-# With direct I/O, writes must occur in blocks of at least 512 bytes, which
equals 32 message indices.
+# The count threshold of buffered messages before triggering a save to disk.
+# Together with `size_of_messages_required_to_save` it defines the threshold.
+# This is a soft limit - actual count may be higher depending on last batch
size.
+# Minimum value is 1.
messages_required_to_save = 1024
-# The size threshold of buffered messages before triggering a save to disk
(string).
-# Specifies how much size of messages accumulate before persisting to storage.
-# Adjusting this can balance between write performance and data durability.
-# This is soft limit, actual number of messages may be higher, depending on
last batch size.
-# Together with `messages_required_to_save` it defines the threshold of
buffered messages.
-# Minimum value is 512 B. Value has to be a multiple of 512 B due to direct
I/O requirements.
-# Direct I/O operations must align with the underlying storage block size
(typically 512 B or 4 KiB).
+# The size threshold of buffered messages before triggering a save to disk.
+# Together with `messages_required_to_save` it defines the threshold.
+# This is a soft limit - actual size may be higher depending on last batch
size.
size_of_messages_required_to_save = "1 MiB"
# Segment configuration
@@ -478,19 +473,6 @@ size_of_messages_required_to_save = "1 MiB"
# Maximum size is 1 GiB. Size has to be a multiple of 512 B.
size = "1 GiB"
-# Configures the message time-based expiry setting.
-# "none" means messages are kept indefinitely.
-# A time value in human-readable format determines the lifespan of messages.
-# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will
expire after that duration.
-message_expiry = "none"
-
-# Defines the file system confirmation behavior during state updates.
-# Controls how the system waits for file write operations to complete.
-# Possible values:
-# - "wait": waits for the file operation to complete before proceeding.
-# - "no_wait": proceeds without waiting for the file operation to finish,
potentially increasing performance but at the cost of durability.
-server_confirmation = "wait"
-
# Configures whether expired segments are archived (boolean) or just deleted
without archiving.
archive_expired = false
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 70703e43b..a73281637 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -250,7 +250,6 @@ pub async fn load_segments(
let messages_file_path = format!("{}/{}.{}", partition_path,
log_file_name, LOG_EXTENSION);
let index_file_path = format!("{}/{}.{}", partition_path,
log_file_name, INDEX_EXTENSION);
- let time_index_path = index_file_path.replace(INDEX_EXTENSION,
"timeindex");
async fn try_exists(path: &str) -> Result<bool, std::io::Error> {
match compio::fs::metadata(path).await {
@@ -263,13 +262,12 @@ pub async fn load_segments(
}
let index_path_exists = try_exists(&index_file_path).await.unwrap();
- let time_index_path_exists =
try_exists(&time_index_path).await.unwrap();
let index_cache_enabled = matches!(
config.segment.cache_indexes,
CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
);
- if index_cache_enabled && (!index_path_exists ||
time_index_path_exists) {
+ if index_cache_enabled && !index_path_exists {
warn!(
"Index at path {} does not exist, rebuilding it based on
{}...",
index_file_path, messages_file_path
@@ -293,10 +291,6 @@ pub async fn load_segments(
);
}
- if time_index_path_exists {
- compio::fs::remove_file(&time_index_path).await.unwrap();
- }
-
let messages_metadata = compio::fs::metadata(&messages_file_path)
.await
.map_err(|_| IggyError::CannotReadPartitions)?;
@@ -345,11 +339,7 @@ pub async fn load_segments(
)
};
- let mut segment = Segment::new(
- start_offset,
- config.segment.size,
- config.segment.message_expiry,
- );
+ let mut segment = Segment::new(start_offset, config.segment.size);
segment.start_timestamp = start_timestamp;
segment.end_timestamp = end_timestamp;
@@ -438,6 +428,11 @@ pub async fn load_segments(
}
}
+ // The last segment is the active one and must remain unsealed for writes
+ if log.has_segments() {
+ log.segments_mut().last_mut().unwrap().sealed = false;
+ }
+
if matches!(
config.segment.cache_indexes,
CacheIndexesConfig::OpenSegment
diff --git a/core/server/src/configs/defaults.rs
b/core/server/src/configs/defaults.rs
index ce9a84ab4..90979caa4 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -443,7 +443,7 @@ impl Default for TopicConfig {
TopicConfig {
path: SERVER_CONFIG.system.topic.path.parse().unwrap(),
max_size: SERVER_CONFIG.system.topic.max_size.parse().unwrap(),
- delete_oldest_segments:
SERVER_CONFIG.system.topic.delete_oldest_segments,
+ message_expiry:
SERVER_CONFIG.system.topic.message_expiry.parse().unwrap(),
}
}
}
@@ -471,7 +471,6 @@ impl Default for SegmentConfig {
SegmentConfig {
size: SERVER_CONFIG.system.segment.size.parse().unwrap(),
cache_indexes:
SERVER_CONFIG.system.segment.cache_indexes.parse().unwrap(),
- message_expiry:
SERVER_CONFIG.system.segment.message_expiry.parse().unwrap(),
archive_expired: SERVER_CONFIG.system.segment.archive_expired,
}
}
diff --git a/core/server/src/configs/displays.rs
b/core/server/src/configs/displays.rs
index 515011c25..64faa95b7 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/server/src/configs/displays.rs
@@ -205,8 +205,8 @@ impl Display for TopicConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ path: {}, max_size: {}, delete_oldest_segments: {} }}",
- self.path, self.max_size, self.delete_oldest_segments
+ "{{ path: {}, max_size: {}, message_expiry: {} }}",
+ self.path, self.max_size, self.message_expiry
)
}
}
@@ -239,8 +239,8 @@ impl Display for SegmentConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ size_bytes: {}, cache_indexes: {}, message_expiry: {},
archive_expired: {} }}",
- self.size, self.cache_indexes, self.message_expiry,
self.archive_expired,
+ "{{ size_bytes: {}, cache_indexes: {}, archive_expired: {} }}",
+ self.size, self.cache_indexes, self.archive_expired,
)
}
}
diff --git a/core/server/src/configs/system.rs
b/core/server/src/configs/system.rs
index 8e1205967..58de89ed5 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -119,7 +119,9 @@ pub struct TopicConfig {
#[config_env(leaf)]
#[serde_as(as = "DisplayFromStr")]
pub max_size: MaxTopicSize,
- pub delete_oldest_segments: bool,
+ #[config_env(leaf)]
+ #[serde_as(as = "DisplayFromStr")]
+ pub message_expiry: IggyExpiry,
}
#[derive(Debug, Deserialize, Serialize, ConfigEnv)]
@@ -154,9 +156,6 @@ pub struct SegmentConfig {
pub size: IggyByteSize,
#[config_env(leaf)]
pub cache_indexes: CacheIndexesConfig,
- #[config_env(leaf)]
- #[serde_as(as = "DisplayFromStr")]
- pub message_expiry: IggyExpiry,
pub archive_expired: bool,
}
@@ -328,7 +327,7 @@ impl SystemConfig {
pub fn resolve_message_expiry(&self, message_expiry: IggyExpiry) ->
IggyExpiry {
match message_expiry {
- IggyExpiry::ServerDefault => self.segment.message_expiry,
+ IggyExpiry::ServerDefault => self.topic.message_expiry,
_ => message_expiry,
}
}
diff --git a/core/server/src/configs/validators.rs
b/core/server/src/configs/validators.rs
index ceb0fe30d..b9d70aaf3 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -35,7 +35,7 @@ use iggy_common::IggyExpiry;
use iggy_common::MaxTopicSize;
use iggy_common::Validatable;
use std::thread::available_parallelism;
-use tracing::{error, warn};
+use tracing::warn;
impl Validatable<ConfigurationError> for ServerConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
@@ -92,20 +92,30 @@ impl Validatable<ConfigurationError> for ServerConfig {
let topic_size = match self.system.topic.max_size {
MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()),
MaxTopicSize::Unlimited => Ok(u64::MAX),
- MaxTopicSize::ServerDefault =>
Err(ConfigurationError::InvalidConfigurationValue),
+ MaxTopicSize::ServerDefault => {
+ eprintln!("system.topic.max_size cannot be ServerDefault in
server config");
+ Err(ConfigurationError::InvalidConfigurationValue)
+ }
}?;
- if let IggyExpiry::ServerDefault = self.system.segment.message_expiry {
+ if let IggyExpiry::ServerDefault = self.system.topic.message_expiry {
+ eprintln!("system.topic.message_expiry cannot be ServerDefault in
server config");
return Err(ConfigurationError::InvalidConfigurationValue);
}
if self.http.enabled
&& let IggyExpiry::ServerDefault =
self.http.jwt.access_token_expiry
{
+ eprintln!("http.jwt.access_token_expiry cannot be ServerDefault
when HTTP is enabled");
return Err(ConfigurationError::InvalidConfigurationValue);
}
if topic_size < self.system.segment.size.as_bytes_u64() {
+ eprintln!(
+ "system.topic.max_size ({} B) must be >= system.segment.size
({} B)",
+ topic_size,
+ self.system.segment.size.as_bytes_u64()
+ );
return Err(ConfigurationError::InvalidConfigurationValue);
}
@@ -134,14 +144,17 @@ impl Validatable<ConfigurationError> for TelemetryConfig {
}
if self.service_name.trim().is_empty() {
+ eprintln!("telemetry.service_name cannot be empty when telemetry
is enabled");
return Err(ConfigurationError::InvalidConfigurationValue);
}
if self.logs.endpoint.is_empty() {
+ eprintln!("telemetry.logs.endpoint cannot be empty when telemetry
is enabled");
return Err(ConfigurationError::InvalidConfigurationValue);
}
if self.traces.endpoint.is_empty() {
+ eprintln!("telemetry.traces.endpoint cannot be empty when
telemetry is enabled");
return Err(ConfigurationError::InvalidConfigurationValue);
}
@@ -151,39 +164,8 @@ impl Validatable<ConfigurationError> for TelemetryConfig {
impl Validatable<ConfigurationError> for PartitionConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
- if self.messages_required_to_save < 32 {
- error!(
- "Configured system.partition.messages_required_to_save {} is
less than minimum {}",
- self.messages_required_to_save, 32
- );
- return Err(ConfigurationError::InvalidConfigurationValue);
- }
-
- if !self.messages_required_to_save.is_multiple_of(32) {
- error!(
- "Configured system.partition.messages_required_to_save {} is
not a multiple of 32",
- self.messages_required_to_save
- );
- return Err(ConfigurationError::InvalidConfigurationValue);
- }
-
- if self.size_of_messages_required_to_save < 512 {
- error!(
- "Configured system.partition.size_of_messages_required_to_save
{} is less than minimum {}",
- self.size_of_messages_required_to_save, 512
- );
- return Err(ConfigurationError::InvalidConfigurationValue);
- }
-
- if !self
- .size_of_messages_required_to_save
- .as_bytes_u64()
- .is_multiple_of(512)
- {
- error!(
- "Configured system.partition.size_of_messages_required_to_save
{} is not a multiple of 512 B",
- self.size_of_messages_required_to_save
- );
+ if self.messages_required_to_save == 0 {
+ eprintln!("Configured system.partition.messages_required_to_save
cannot be 0");
return Err(ConfigurationError::InvalidConfigurationValue);
}
@@ -194,7 +176,7 @@ impl Validatable<ConfigurationError> for PartitionConfig {
impl Validatable<ConfigurationError> for SegmentConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
if self.size > SEGMENT_MAX_SIZE_BYTES {
- error!(
+ eprintln!(
"Configured system.segment.size {} B is greater than maximum
{} B",
self.size.as_bytes_u64(),
SEGMENT_MAX_SIZE_BYTES
@@ -203,7 +185,7 @@ impl Validatable<ConfigurationError> for SegmentConfig {
}
if !self.size.as_bytes_u64().is_multiple_of(512) {
- error!(
+ eprintln!(
"Configured system.segment.size {} B is not a multiple of 512
B",
self.size.as_bytes_u64()
);
@@ -217,6 +199,7 @@ impl Validatable<ConfigurationError> for SegmentConfig {
impl Validatable<ConfigurationError> for MessageSaverConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
if self.enabled && self.interval.is_zero() {
+ eprintln!("message_saver.interval cannot be zero when
message_saver is enabled");
return Err(ConfigurationError::InvalidConfigurationValue);
}
@@ -236,6 +219,7 @@ impl Validatable<ConfigurationError> for
DataMaintenanceConfig {
impl Validatable<ConfigurationError> for MessagesMaintenanceConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
if self.cleaner_enabled && self.interval.is_zero() {
+ eprintln!("data_maintenance.messages.interval cannot be zero when
cleaner is enabled");
return Err(ConfigurationError::InvalidConfigurationValue);
}
@@ -246,10 +230,14 @@ impl Validatable<ConfigurationError> for
MessagesMaintenanceConfig {
impl Validatable<ConfigurationError> for PersonalAccessTokenConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
if self.max_tokens_per_user == 0 {
+ eprintln!("personal_access_token.max_tokens_per_user cannot be 0");
return Err(ConfigurationError::InvalidConfigurationValue);
}
if self.cleaner.enabled && self.cleaner.interval.is_zero() {
+ eprintln!(
+ "personal_access_token.cleaner.interval cannot be zero when
cleaner is enabled"
+ );
return Err(ConfigurationError::InvalidConfigurationValue);
}
@@ -260,12 +248,12 @@ impl Validatable<ConfigurationError> for
PersonalAccessTokenConfig {
impl Validatable<ConfigurationError> for LoggingConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
if self.level.is_empty() {
- error!("system.logging.level is supposed be configured");
+ eprintln!("system.logging.level is supposed be configured");
return Err(ConfigurationError::InvalidConfigurationValue);
}
if self.retention.as_secs() < 1 {
- error!(
+ eprintln!(
"Configured system.logging.retention {} is less than minimum 1
second",
self.retention
);
@@ -273,7 +261,7 @@ impl Validatable<ConfigurationError> for LoggingConfig {
}
if self.rotation_check_interval.as_secs() < 1 {
- error!(
+ eprintln!(
"Configured system.logging.rotation_check_interval {} is less
than minimum 1 second",
self.rotation_check_interval
);
@@ -284,7 +272,7 @@ impl Validatable<ConfigurationError> for LoggingConfig {
if !max_total_size_unlimited
&& self.max_file_size.as_bytes_u64() >
self.max_total_size.as_bytes_u64()
{
- error!(
+ eprintln!(
"Configured system.logging.max_total_size {} is less than
system.logging.max_file_size {}",
self.max_total_size, self.max_file_size
);
@@ -298,7 +286,7 @@ impl Validatable<ConfigurationError> for LoggingConfig {
impl Validatable<ConfigurationError> for MemoryPoolConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
if self.enabled && self.size == 0 {
- error!(
+ eprintln!(
"Configured system.memory_pool.enabled is true and
system.memory_pool.size is 0"
);
return Err(ConfigurationError::InvalidConfigurationValue);
@@ -309,7 +297,7 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig {
const DEFAULT_PAGE_SIZE: u64 = 4096;
if self.enabled && self.size < MIN_POOL_SIZE {
- error!(
+ eprintln!(
"Configured system.memory_pool.size {} B ({} MiB) is less than
minimum {} B, ({} MiB)",
self.size.as_bytes_u64(),
self.size.as_bytes_u64() / (1024 * 1024),
@@ -320,7 +308,7 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig {
}
if self.enabled &&
!self.size.as_bytes_u64().is_multiple_of(DEFAULT_PAGE_SIZE) {
- error!(
+ eprintln!(
"Configured system.memory_pool.size {} B is not a multiple of
default page size {} B",
self.size.as_bytes_u64(),
DEFAULT_PAGE_SIZE
@@ -329,7 +317,7 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig {
}
if self.enabled && self.bucket_capacity < MIN_BUCKET_CAPACITY {
- error!(
+ eprintln!(
"Configured system.memory_pool.buffers {} is less than minimum
{}",
self.bucket_capacity, MIN_BUCKET_CAPACITY
);
@@ -337,7 +325,7 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig {
}
if self.enabled && !self.bucket_capacity.is_power_of_two() {
- error!(
+ eprintln!(
"Configured system.memory_pool.buffers {} is not a power of 2",
self.bucket_capacity
);
@@ -358,11 +346,11 @@ impl Validatable<ConfigurationError> for ShardingConfig {
CpuAllocation::All => Ok(()),
CpuAllocation::Count(count) => {
if *count == 0 {
- error!("Invalid sharding configuration: cpu_allocation
count cannot be 0");
+ eprintln!("Invalid sharding configuration: cpu_allocation
count cannot be 0");
return Err(ConfigurationError::InvalidConfigurationValue);
}
if *count > available_cpus {
- error!(
+ eprintln!(
"Invalid sharding configuration: cpu_allocation count
{count} exceeds available CPU cores {available_cpus}"
);
return Err(ConfigurationError::InvalidConfigurationValue);
@@ -371,13 +359,13 @@ impl Validatable<ConfigurationError> for ShardingConfig {
}
CpuAllocation::Range(start, end) => {
if start >= end {
- error!(
+ eprintln!(
"Invalid sharding configuration: cpu_allocation range
{start}..{end} is invalid (start must be less than end)"
);
return Err(ConfigurationError::InvalidConfigurationValue);
}
if *end > available_cpus {
- error!(
+ eprintln!(
"Invalid sharding configuration: cpu_allocation range
{start}..{end} exceeds available CPU cores (max: {available_cpus})"
);
return Err(ConfigurationError::InvalidConfigurationValue);
@@ -387,12 +375,12 @@ impl Validatable<ConfigurationError> for ShardingConfig {
CpuAllocation::NumaAware(numa_config) => match
NumaTopology::detect() {
// TODO: dry the validation, already validate it from the
shard allocation
Ok(topology) => numa_config.validate(&topology).map_err(|e| {
- error!("Invalid NUMA configuration: {}", e);
+ eprintln!("Invalid NUMA configuration: {}", e);
ConfigurationError::InvalidConfigurationValue
}),
Err(e) => {
- error!("Failed to detect NUMA topology: {}", e);
- error!("NUMA allocation requested but system doesn't
support it");
+ eprintln!("Failed to detect NUMA topology: {}", e);
+ eprintln!("NUMA allocation requested but system doesn't
support it");
Err(ConfigurationError::InvalidConfigurationValue)
}
},
@@ -408,13 +396,13 @@ impl Validatable<ConfigurationError> for ClusterConfig {
// Validate cluster name is not empty
if self.name.trim().is_empty() {
- error!("Invalid cluster configuration: cluster name cannot be
empty");
+ eprintln!("Invalid cluster configuration: cluster name cannot be
empty");
return Err(ConfigurationError::InvalidConfigurationValue);
}
// Validate current node name is not empty
if self.node.current.name.trim().is_empty() {
- error!("Invalid cluster configuration: current node name cannot be
empty");
+ eprintln!("Invalid cluster configuration: current node name cannot
be empty");
return Err(ConfigurationError::InvalidConfigurationValue);
}
@@ -424,7 +412,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
for node in &self.node.others {
if !node_names.insert(node.name.clone()) {
- error!(
+ eprintln!(
"Invalid cluster configuration: duplicate node name '{}'
found",
node.name
);
@@ -437,13 +425,13 @@ impl Validatable<ConfigurationError> for ClusterConfig {
for node in &self.node.others {
// Validate node name is not empty
if node.name.trim().is_empty() {
- error!("Invalid cluster configuration: node name cannot be
empty");
+ eprintln!("Invalid cluster configuration: node name cannot be
empty");
return Err(ConfigurationError::InvalidConfigurationValue);
}
// Validate IP is not empty
if node.ip.trim().is_empty() {
- error!(
+ eprintln!(
"Invalid cluster configuration: IP cannot be empty for
node '{}'",
node.name
);
@@ -461,7 +449,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
for (name, port_opt) in &port_list {
if let Some(port) = port_opt {
if *port == 0 {
- error!(
+ eprintln!(
"Invalid cluster configuration: {} port cannot be
0 for node '{}'",
name, node.name
);
@@ -471,7 +459,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
// Check for port conflicts across nodes on the same IP
let endpoint = format!("{}:{}:{}", node.ip, name, port);
if !used_endpoints.insert(endpoint.clone()) {
- error!(
+ eprintln!(
"Invalid cluster configuration: port conflict -
{}:{} is already used",
node.ip, port
);
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index cc2b3a75c..008495743 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -595,7 +595,7 @@ impl IggyShard {
return Ok(0);
}
- let (messages_writer, index_writer, segment_index) = {
+ let (messages_writer, index_writer) = {
let partitions = self.local_partitions.borrow();
let partition = partitions
.get(namespace)
@@ -605,7 +605,6 @@ impl IggyShard {
return Ok(0);
}
- let segment_index = partition.log.segments().len() - 1;
let messages_writer = partition
.log
.active_storage()
@@ -620,7 +619,7 @@ impl IggyShard {
.as_ref()
.expect("Index writer not initialized")
.clone();
- (messages_writer, index_writer, segment_index)
+ (messages_writer, index_writer)
};
let saved = messages_writer
@@ -633,6 +632,7 @@ impl IggyShard {
let partition = partitions
.get(namespace)
.expect("local_partitions: partition must exist");
+ let segment_index = partition.log.segments().len() - 1;
partition.log.indexes()[segment_index]
.as_ref()
.expect("indexes must exist for segment being persisted")
@@ -657,6 +657,9 @@ impl IggyShard {
.get_mut(namespace)
.expect("local_partitions: partition must exist");
+ // Recalculate index: segment deletion during async I/O shifts
indices
+ let segment_index = partition.log.segments().len() - 1;
+
let indexes = partition.log.indexes_mut()[segment_index]
.as_mut()
.expect("indexes must exist for segment being persisted");
diff --git a/core/server/src/shard/system/partitions.rs
b/core/server/src/shard/system/partitions.rs
index 8b32dce6c..4c3a04a85 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -241,11 +241,7 @@ impl IggyShard {
);
let start_offset = 0;
- let segment = Segment::new(
- start_offset,
- self.config.system.segment.size,
- self.config.system.segment.message_expiry,
- );
+ let segment = Segment::new(start_offset,
self.config.system.segment.size);
let storage = create_segment_storage(
&self.config.system,
diff --git a/core/server/src/shard/system/segments.rs
b/core/server/src/shard/system/segments.rs
index fb4a68496..38e91fd2d 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -121,8 +121,6 @@ impl IggyShard {
Ok(())
}
- /// Initialize a new segment in local_partitions.
- /// Used when partition data is in local_partitions (not slabs).
async fn init_log_in_local_partitions(
&self,
namespace: &IggyNamespace,
@@ -130,11 +128,7 @@ impl IggyShard {
use crate::streaming::segments::storage::create_segment_storage;
let start_offset = 0;
- let segment = Segment::new(
- start_offset,
- self.config.system.segment.size,
- self.config.system.segment.message_expiry,
- );
+ let segment = Segment::new(start_offset,
self.config.system.segment.size);
let storage = create_segment_storage(
&self.config.system,
@@ -161,6 +155,7 @@ impl IggyShard {
/// Rotate to a new segment when the current segment is full.
/// The new segment starts at the next offset after the current segment's
end.
+ /// Seals the old segment so it becomes eligible for expiry-based cleanup.
pub(crate) async fn rotate_segment_in_local_partitions(
&self,
namespace: &IggyNamespace,
@@ -168,18 +163,16 @@ impl IggyShard {
use crate::streaming::segments::storage::create_segment_storage;
let start_offset = {
- let partitions = self.local_partitions.borrow();
+ let mut partitions = self.local_partitions.borrow_mut();
let partition = partitions
- .get(namespace)
+ .get_mut(namespace)
.expect("rotate_segment: partition must exist");
- partition.log.active_segment().end_offset + 1
+ let active_segment = partition.log.active_segment_mut();
+ active_segment.sealed = true;
+ active_segment.end_offset + 1
};
- let segment = Segment::new(
- start_offset,
- self.config.system.segment.size,
- self.config.system.segment.message_expiry,
- );
+ let segment = Segment::new(start_offset,
self.config.system.segment.size);
let storage = create_segment_storage(
&self.config.system,
@@ -197,9 +190,9 @@ impl IggyShard {
partition.log.add_persisted_segment(segment, storage);
partition.stats.increment_segments_count(1);
tracing::info!(
- "Rotated to new segment at offset {} for partition {:?}",
+ "Rotated to new segment at offset {} for partition {}",
start_offset,
- namespace
+ namespace.partition_id()
);
}
Ok(())
diff --git a/core/server/src/shard/tasks/periodic/message_cleaner.rs
b/core/server/src/shard/tasks/periodic/message_cleaner.rs
index 62a0a7a24..5e33428c2 100644
--- a/core/server/src/shard/tasks/periodic/message_cleaner.rs
+++ b/core/server/src/shard/tasks/periodic/message_cleaner.rs
@@ -18,7 +18,7 @@
use crate::shard::IggyShard;
use iggy_common::sharding::IggyNamespace;
-use iggy_common::{IggyError, IggyTimestamp, MaxTopicSize};
+use iggy_common::{IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize};
use std::rc::Rc;
use tracing::{debug, error, info, trace, warn};
@@ -52,7 +52,6 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) ->
Result<(), IggyError> {
let namespaces = shard.get_current_shard_namespaces();
let now = IggyTimestamp::now();
- let delete_oldest_segments =
shard.config.system.topic.delete_oldest_segments;
let mut topics: std::collections::HashMap<(usize, usize), Vec<usize>> =
std::collections::HashMap::new();
@@ -73,8 +72,8 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) ->
Result<(), IggyError> {
let mut topic_deleted_segments = 0u64;
let mut topic_deleted_messages = 0u64;
- for partition_id in partition_ids {
- // Handle expired segments
+ // Phase 1: Time-based expiry cleanup per partition
+ for &partition_id in &partition_ids {
let expired_result =
handle_expired_segments(&shard, stream_id, topic_id,
partition_id, now).await;
@@ -90,24 +89,22 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) ->
Result<(), IggyError> {
);
}
}
+ }
- // Handle oldest segments if topic size management is enabled
- if delete_oldest_segments {
- let oldest_result =
- handle_oldest_segments(&shard, stream_id, topic_id,
partition_id).await;
-
- match oldest_result {
- Ok(deleted) => {
- topic_deleted_segments += deleted.segments_count;
- topic_deleted_messages += deleted.messages_count;
- }
- Err(err) => {
- error!(
- "Failed to clean oldest segments for stream ID:
{}, topic ID: {}, partition ID: {}. Error: {}",
- stream_id, topic_id, partition_id, err
- );
- }
- }
+ // Phase 2: Size-based cleanup at topic level (fair across partitions)
+ let size_result =
+ handle_size_based_cleanup(&shard, stream_id, topic_id,
&partition_ids).await;
+
+ match size_result {
+ Ok(deleted) => {
+ topic_deleted_segments += deleted.segments_count;
+ topic_deleted_messages += deleted.messages_count;
+ }
+ Err(err) => {
+ error!(
+ "Failed to clean segments by size for stream ID: {}, topic
ID: {}. Error: {}",
+ stream_id, topic_id, err
+ );
}
}
@@ -119,7 +116,6 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) ->
Result<(), IggyError> {
total_deleted_segments += topic_deleted_segments;
total_deleted_messages += topic_deleted_messages;
- // Update metrics
shard
.metrics
.decrement_segments(topic_deleted_segments as u32);
@@ -148,6 +144,13 @@ struct DeletedSegments {
pub messages_count: u64,
}
+impl DeletedSegments {
+ fn add(&mut self, other: &DeletedSegments) {
+ self.segments_count += other.segments_count;
+ self.messages_count += other.messages_count;
+ }
+}
+
async fn handle_expired_segments(
shard: &Rc<IggyShard>,
stream_id: usize,
@@ -157,18 +160,29 @@ async fn handle_expired_segments(
) -> Result<DeletedSegments, IggyError> {
let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
- // Scope the borrow to avoid holding across await
+ let expiry = shard
+ .metadata
+ .get_topic_config(stream_id, topic_id)
+ .map(|(exp, _)| exp)
+ .unwrap_or(shard.config.system.topic.message_expiry);
+
+ if matches!(expiry, IggyExpiry::NeverExpire) {
+ return Ok(DeletedSegments::default());
+ }
+
let expired_segment_offsets: Vec<u64> = {
let partitions = shard.local_partitions.borrow();
let Some(partition) = partitions.get(&ns) else {
return Ok(DeletedSegments::default());
};
- partition
- .log
- .segments()
+ let segments = partition.log.segments();
+ let last_idx = segments.len().saturating_sub(1);
+
+ segments
.iter()
- .filter(|segment| segment.is_expired(now))
- .map(|segment| segment.start_offset)
+ .enumerate()
+ .filter(|(idx, segment)| *idx != last_idx &&
segment.is_expired(now, expiry))
+ .map(|(_, segment)| segment.start_offset)
.collect()
};
@@ -194,13 +208,15 @@ async fn handle_expired_segments(
.await
}
-async fn handle_oldest_segments(
+/// Handles size-based cleanup at the topic level.
+/// Deletes the globally oldest sealed segment across all partitions until
topic size is below 90% threshold.
+async fn handle_size_based_cleanup(
shard: &Rc<IggyShard>,
stream_id: usize,
topic_id: usize,
- partition_id: usize,
+ partition_ids: &[usize],
) -> Result<DeletedSegments, IggyError> {
- let Some((max_size, current_size)) = shard.metadata.with_metadata(|m| {
+ let Some((max_size, _)) = shard.metadata.with_metadata(|m| {
m.streams
.get(stream_id)
.and_then(|s| s.topics.get(topic_id))
@@ -209,58 +225,109 @@ async fn handle_oldest_segments(
return Ok(DeletedSegments::default());
};
- let is_unlimited = matches!(max_size, MaxTopicSize::Unlimited);
-
- if is_unlimited {
- debug!(
- "Topic is unlimited, oldest segments will not be deleted for
stream ID: {}, topic ID: {}, partition ID: {}",
- stream_id, topic_id, partition_id
- );
+ if matches!(max_size, MaxTopicSize::Unlimited) {
return Ok(DeletedSegments::default());
}
let max_bytes = max_size.as_bytes_u64();
- let is_almost_full = current_size >= (max_bytes * 9 / 10);
+ let threshold = max_bytes * 9 / 10;
+
+ let mut total_deleted = DeletedSegments::default();
+
+ loop {
+ let current_size = shard
+ .metadata
+ .with_metadata(|m| {
+ m.streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .map(|t| t.stats.size_bytes_inconsistent())
+ })
+ .unwrap_or(0);
+
+ if current_size < threshold {
+ break;
+ }
- if !is_almost_full {
- debug!(
- "Topic is not almost full, oldest segments will not be deleted for
stream ID: {}, topic ID: {}, partition ID: {}",
- stream_id, topic_id, partition_id
+ let Some((target_partition_id, target_offset, target_timestamp)) =
+ find_oldest_segment_in_shard(shard, stream_id, topic_id,
partition_ids)
+ else {
+ debug!(
+ "No deletable segments found for stream ID: {}, topic ID: {}
(all partitions have only active segment)",
+ stream_id, topic_id
+ );
+ break;
+ };
+
+ info!(
+ "Deleting oldest segment (start_offset: {}, timestamp: {}) from
partition {} for stream ID: {}, topic ID: {}",
+ target_offset, target_timestamp, target_partition_id, stream_id,
topic_id
);
- return Ok(DeletedSegments::default());
+
+ let deleted = delete_segments(
+ shard,
+ stream_id,
+ topic_id,
+ target_partition_id,
+ &[target_offset],
+ )
+ .await?;
+ total_deleted.add(&deleted);
+
+ if deleted.segments_count == 0 {
+ break;
+ }
}
- let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
+ Ok(total_deleted)
+}
- // Scope the borrow to avoid holding across await
- let oldest_segment_offset = {
- let partitions = shard.local_partitions.borrow();
- partitions.get(&ns).and_then(|partition| {
- let segments = partition.log.segments();
- // Find the first closed segment (not the active one)
- if segments.len() > 1 {
- // The last segment is always active, so we look at earlier
ones
- segments.first().map(|s| s.start_offset)
- } else {
- None
- }
- })
- };
+/// Finds the oldest sealed segment across partitions owned by this shard.
+/// For each partition, the first segment in the vector is the oldest
(segments are ordered).
+/// Compares first segments across partitions by timestamp to ensure fair
deletion.
+/// Returns (partition_id, start_offset, start_timestamp) or None if no
deletable segments exist.
+fn find_oldest_segment_in_shard(
+ shard: &Rc<IggyShard>,
+ stream_id: usize,
+ topic_id: usize,
+ partition_ids: &[usize],
+) -> Option<(usize, u64, u64)> {
+ let partitions = shard.local_partitions.borrow();
- if let Some(start_offset) = oldest_segment_offset {
- info!(
- "Deleting oldest segment with start offset {} for stream ID: {},
topic ID: {}, partition ID: {}",
- start_offset, stream_id, topic_id, partition_id
- );
+ let mut oldest: Option<(usize, u64, u64)> = None;
- delete_segments(shard, stream_id, topic_id, partition_id,
&[start_offset]).await
- } else {
- debug!(
- "No closed segments found to delete for stream ID: {}, topic ID:
{}, partition ID: {}",
- stream_id, topic_id, partition_id
+ for &partition_id in partition_ids {
+ let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
+ let Some(partition) = partitions.get(&ns) else {
+ continue;
+ };
+
+ let segments = partition.log.segments();
+ if segments.len() <= 1 {
+ continue;
+ }
+
+ // First segment is the oldest in this partition (segments are ordered
chronologically)
+ let first_segment = &segments[0];
+ if !first_segment.sealed {
+ continue;
+ }
+
+ let candidate = (
+ partition_id,
+ first_segment.start_offset,
+ first_segment.start_timestamp,
);
- Ok(DeletedSegments::default())
+ match &oldest {
+ None => oldest = Some(candidate),
+ Some((_, _, oldest_ts)) if first_segment.start_timestamp <
*oldest_ts => {
+ oldest = Some(candidate);
+ }
+ _ => {}
+ }
}
+
+ oldest
}
async fn delete_segments(
@@ -287,7 +354,6 @@ async fn delete_segments(
let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
- // Extract segments and storages to delete from local_partitions
let (stats, segments_to_delete, mut storages_to_delete) = {
let mut partitions = shard.local_partitions.borrow_mut();
let Some(partition) = partitions.get_mut(&ns) else {
@@ -334,7 +400,7 @@ async fn delete_segments(
let start_offset = segment.start_offset;
let end_offset = segment.end_offset;
- let approx_messages = if (end_offset - start_offset) == 0 {
+ let messages_in_segment = if start_offset == end_offset {
0
} else {
(end_offset - start_offset) + 1
@@ -362,14 +428,6 @@ async fn delete_segments(
} else {
trace!("Deleted index file: {}", path);
}
-
- let time_index_path = path.replace(".index", ".timeindex");
- if let Err(e) = compio::fs::remove_file(&time_index_path).await {
- trace!(
- "Could not delete time index file {}: {}",
- time_index_path, e
- );
- }
} else {
warn!(
"Index writer path not found for segment starting at offset
{}",
@@ -379,15 +437,15 @@ async fn delete_segments(
stats.decrement_size_bytes(segment_size);
stats.decrement_segments_count(1);
- stats.decrement_messages_count(messages_count);
+ stats.decrement_messages_count(messages_in_segment);
info!(
"Deleted segment with start offset {} (end: {}, size: {},
messages: {}) from partition ID: {}",
- start_offset, end_offset, segment_size, approx_messages,
partition_id
+ start_offset, end_offset, segment_size, messages_in_segment,
partition_id
);
segments_count += 1;
- messages_count += approx_messages;
+ messages_count += messages_in_segment;
}
Ok(DeletedSegments {
diff --git a/core/server/src/streaming/segments/segment.rs
b/core/server/src/streaming/segments/segment.rs
index fe653b44f..d73925b08 100644
--- a/core/server/src/streaming/segments/segment.rs
+++ b/core/server/src/streaming/segments/segment.rs
@@ -21,24 +21,22 @@ use std::fmt::Display;
#[derive(Default, Debug, Clone)]
pub struct Segment {
pub sealed: bool,
- pub message_expiry: IggyExpiry,
pub start_timestamp: u64,
pub end_timestamp: u64,
pub current_position: u32,
pub start_offset: u64,
pub end_offset: u64,
- pub size: IggyByteSize, // u64
- pub max_size: IggyByteSize, // u64
+ pub size: IggyByteSize,
+ pub max_size: IggyByteSize,
}
impl Display for Segment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "Segment {{ sealed: {}, max_size_bytes: {}, message_expiry: {:?},
start_timestamp: {}, end_timestamp: {}, start_offset: {}, end_offset: {}, size:
{} }}",
+ "Segment {{ sealed: {}, max_size: {}, start_timestamp: {},
end_timestamp: {}, start_offset: {}, end_offset: {}, size: {} }}",
self.sealed,
self.max_size,
- self.message_expiry,
self.start_timestamp,
self.end_timestamp,
self.start_offset,
@@ -49,16 +47,10 @@ impl Display for Segment {
}
impl Segment {
- /// Creates a new Segment with the specified parameters
- pub fn new(
- start_offset: u64,
- max_size_bytes: IggyByteSize,
- message_expiry: IggyExpiry,
- ) -> Self {
+ pub fn new(start_offset: u64, max_size_bytes: IggyByteSize) -> Self {
Self {
sealed: false,
max_size: max_size_bytes,
- message_expiry,
start_timestamp: 0,
end_timestamp: 0,
start_offset,
@@ -69,24 +61,19 @@ impl Segment {
}
pub fn is_full(&self) -> bool {
- if self.size >= self.max_size {
- return true;
- }
-
- self.is_expired(IggyTimestamp::now())
+ self.size >= self.max_size
}
- pub fn is_expired(&self, now: IggyTimestamp) -> bool {
+ pub fn is_expired(&self, now: IggyTimestamp, expiry: IggyExpiry) -> bool {
if !self.sealed {
return false;
}
- match self.message_expiry {
+ match expiry {
IggyExpiry::NeverExpire => false,
IggyExpiry::ServerDefault => false,
- IggyExpiry::ExpireDuration(expiry) => {
- let last_message_timestamp = self.end_timestamp;
- last_message_timestamp + expiry.as_micros() <= now.as_micros()
+ IggyExpiry::ExpireDuration(duration) => {
+ self.end_timestamp + duration.as_micros() <= now.as_micros()
}
}
}