This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 202fda739 fix(server): properly cleanup messages in message cleaner 
(#2670)
202fda739 is described below

commit 202fda7396b1500512403e4c7ddda3ba52b6875e
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Feb 4 16:57:52 2026 +0100

    fix(server): properly cleanup messages in message cleaner (#2670)
    
    Segments created after server startup were using the server's
    default message_expiry (NeverExpire) rather than the topic's
    configured value, causing messages to never be cleaned up even
    when expiry was set.
    
    - Propagate topic's message_expiry to all segment creation sites
    - Move message_expiry config from [system.segment] to [system.topic]
    - Remove delete_oldest_segments toggle
    - Seal segments on rotation so is_expired() can evaluate them
    - Add comprehensive tests for time-based, size-based, and combined
    
    Closes #2629.
    
    ---------
    
    Co-authored-by: Grzegorz Koszyk 
<[email protected]>
---
 core/common/src/utils/duration.rs                  |   2 +-
 core/integration/tests/config_provider/mod.rs      |   9 +-
 core/integration/tests/server/message_cleanup.rs   | 124 +++++
 core/integration/tests/server/mod.rs               |   1 +
 .../server/scenarios/message_cleanup_scenario.rs   | 588 +++++++++++++++++++++
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 core/integration/tests/server/specific.rs          |   6 -
 core/server/config.toml                            |  66 +--
 core/server/src/bootstrap.rs                       |  19 +-
 core/server/src/configs/defaults.rs                |   3 +-
 core/server/src/configs/displays.rs                |   8 +-
 core/server/src/configs/system.rs                  |   9 +-
 core/server/src/configs/validators.rs              | 110 ++--
 core/server/src/shard/system/messages.rs           |   9 +-
 core/server/src/shard/system/partitions.rs         |   6 +-
 core/server/src/shard/system/segments.rs           |  27 +-
 .../src/shard/tasks/periodic/message_cleaner.rs    | 224 +++++---
 core/server/src/streaming/segments/segment.rs      |  31 +-
 18 files changed, 974 insertions(+), 269 deletions(-)

diff --git a/core/common/src/utils/duration.rs 
b/core/common/src/utils/duration.rs
index aa6c796a7..f8c8de92e 100644
--- a/core/common/src/utils/duration.rs
+++ b/core/common/src/utils/duration.rs
@@ -101,7 +101,7 @@ impl IggyDuration {
     }
 
     pub fn is_zero(&self) -> bool {
-        self.duration.as_secs() == 0
+        self.duration.is_zero()
     }
 
     pub fn abs_diff(&self, other: IggyDuration) -> IggyDuration {
diff --git a/core/integration/tests/config_provider/mod.rs 
b/core/integration/tests/config_provider/mod.rs
index d96ba5fc7..adf67eac5 100644
--- a/core/integration/tests/config_provider/mod.rs
+++ b/core/integration/tests/config_provider/mod.rs
@@ -43,10 +43,7 @@ async fn validate_config_env_override() {
             "IGGY_MESSAGE_SAVER_ENABLED",
             expected_message_saver.to_string(),
         );
-        env::set_var(
-            "IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY",
-            expected_message_expiry,
-        );
+        env::set_var("IGGY_SYSTEM_TOPIC_MESSAGE_EXPIRY", 
expected_message_expiry);
     }
 
     let config_path = get_root_path().join("../server/config.toml");
@@ -61,7 +58,7 @@ async fn validate_config_env_override() {
     assert_eq!(config.tcp.enabled, expected_tcp);
     assert_eq!(config.message_saver.enabled, expected_message_saver);
     assert_eq!(
-        config.system.segment.message_expiry.to_string(),
+        config.system.topic.message_expiry.to_string(),
         expected_message_expiry
     );
 
@@ -69,7 +66,7 @@ async fn validate_config_env_override() {
         env::remove_var("IGGY_HTTP_ENABLED");
         env::remove_var("IGGY_TCP_ENABLED");
         env::remove_var("IGGY_MESSAGE_SAVER_ENABLED");
-        env::remove_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY");
+        env::remove_var("IGGY_SYSTEM_TOPIC_MESSAGE_EXPIRY");
     }
 }
 
diff --git a/core/integration/tests/server/message_cleanup.rs 
b/core/integration/tests/server/message_cleanup.rs
new file mode 100644
index 000000000..c61855d83
--- /dev/null
+++ b/core/integration/tests/server/message_cleanup.rs
@@ -0,0 +1,124 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::message_cleanup_scenario;
+use iggy::prelude::*;
+use integration::harness::{TestHarness, TestServerConfig};
+use serial_test::parallel;
+use std::collections::HashMap;
+use std::future::Future;
+use std::path::Path;
+use std::pin::Pin;
+use test_case::test_matrix;
+
+type CleanupScenarioFn =
+    for<'a> fn(&'a IggyClient, &'a Path) -> Pin<Box<dyn Future<Output = ()> + 
'a>>;
+
+fn expiry_after_rotation() -> CleanupScenarioFn {
+    |client, path| {
+        Box::pin(message_cleanup_scenario::run_expiry_after_rotation(
+            client, path,
+        ))
+    }
+}
+
+fn active_segment_protection() -> CleanupScenarioFn {
+    |client, path| {
+        Box::pin(message_cleanup_scenario::run_active_segment_protection(
+            client, path,
+        ))
+    }
+}
+
+fn size_based_retention() -> CleanupScenarioFn {
+    |client, path| {
+        Box::pin(message_cleanup_scenario::run_size_based_retention(
+            client, path,
+        ))
+    }
+}
+
+fn combined_retention() -> CleanupScenarioFn {
+    |client, path| {
+        Box::pin(message_cleanup_scenario::run_combined_retention(
+            client, path,
+        ))
+    }
+}
+
+fn expiry_multipartition() -> CleanupScenarioFn {
+    |client, path| {
+        
Box::pin(message_cleanup_scenario::run_expiry_with_multiple_partitions(client, 
path))
+    }
+}
+
+fn fair_size_cleanup_multipartition() -> CleanupScenarioFn {
+    |client, path| {
+        
Box::pin(message_cleanup_scenario::run_fair_size_based_cleanup_multipartition(client,
 path))
+    }
+}
+
+async fn run_cleanup_scenario(scenario: CleanupScenarioFn) {
+    let mut harness = TestHarness::builder()
+        .server(
+            TestServerConfig::builder()
+                .extra_envs(HashMap::from([
+                    ("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), 
"100KiB".to_string()),
+                    (
+                        
"IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED".to_string(),
+                        "true".to_string(),
+                    ),
+                    (
+                        "IGGY_DATA_MAINTENANCE_MESSAGES_INTERVAL".to_string(),
+                        "100ms".to_string(),
+                    ),
+                    (
+                        
"IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
+                        "1".to_string(),
+                    ),
+                    (
+                        "IGGY_SYSTEM_PARTITION_ENFORCE_FSYNC".to_string(),
+                        "true".to_string(),
+                    ),
+                ]))
+                .build(),
+        )
+        .build()
+        .unwrap();
+
+    harness.start().await.unwrap();
+
+    let client = harness.tcp_root_client().await.unwrap();
+    let data_path = harness.server().data_path();
+
+    scenario(&client, &data_path).await;
+}
+
+#[test_matrix([
+    expiry_after_rotation(),
+    active_segment_protection(),
+    size_based_retention(),
+    combined_retention(),
+    expiry_multipartition(),
+    fair_size_cleanup_multipartition(),
+])]
+#[tokio::test]
+#[parallel]
+async fn message_cleanup(scenario: CleanupScenarioFn) {
+    run_cleanup_scenario(scenario).await;
+}
diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/mod.rs
index de58d27cc..376746436 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -19,6 +19,7 @@
 mod cg;
 mod concurrent_addition;
 mod general;
+mod message_cleanup;
 mod message_retrieval;
 mod scenarios;
 mod specific;
diff --git 
a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs 
b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
new file mode 100644
index 000000000..066953153
--- /dev/null
+++ b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
@@ -0,0 +1,588 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for message retention policies (time-based and size-based).
+//!
+//! Configuration: 100KB segment size, 100ms cleaner interval, instant flush.
+//! Message size: 64B header + 936B payload = 1KB per message.
+//! Therefore: 100 messages = 1 segment, 101+ messages = 2+ segments.
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use iggy_common::IggyByteSize;
+use std::fs::{DirEntry, read_dir};
+use std::path::Path;
+use std::time::Duration;
+
+const STREAM_NAME: &str = "test_expiry_stream";
+const TOPIC_NAME: &str = "test_expiry_topic";
+const PARTITION_ID: u32 = 0;
+const LOG_EXTENSION: &str = "log";
+
+/// Payload size chosen so that header (64B) + payload = 1KB per message.
+const PAYLOAD_SIZE: usize = 936;
+
+/// Buffer time for cleaner to run after expiry conditions are met.
+const CLEANER_BUFFER: Duration = Duration::from_millis(300);
+
+fn make_payload(fill: char) -> Bytes {
+    Bytes::from(fill.to_string().repeat(PAYLOAD_SIZE))
+}
+
+/// Tests time-based retention: segments are cleaned up after expiry.
+pub async fn run_expiry_after_rotation(client: &IggyClient, data_path: &Path) {
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
+
+    let expiry = Duration::from_secs(2);
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::ExpireDuration(IggyDuration::from(expiry)),
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let partition_path = data_path
+        .join(format!(
+            "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
+        ))
+        .display()
+        .to_string();
+
+    // Send 110 messages (1KB each) to create 2 segments (100KB segment size)
+    let payload = make_payload('A');
+    let total_messages = 110;
+
+    for i in 0..total_messages {
+        let message = IggyMessage::builder()
+            .id(i as u128)
+            .payload(payload.clone())
+            .build()
+            .unwrap();
+
+        let mut messages = vec![message];
+        client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(PARTITION_ID),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    let initial_segments = get_segment_paths_for_partition(&partition_path);
+    let initial_count = initial_segments.len();
+    assert!(
+        initial_count >= 2,
+        "Expected at least 2 segments but got {}",
+        initial_count
+    );
+
+    // Verify we can poll all messages before expiry
+    let polled_before = client
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(PARTITION_ID),
+            &Consumer::default(),
+            &PollingStrategy::offset(0),
+            total_messages as u32,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(
+        polled_before.messages.len(),
+        total_messages as usize,
+        "Should poll all messages before expiry"
+    );
+
+    // Wait for expiry + cleaner
+    tokio::time::sleep(expiry + CLEANER_BUFFER).await;
+
+    let remaining_segments = get_segment_paths_for_partition(&partition_path);
+    assert!(
+        remaining_segments.len() < initial_count,
+        "Expected segments to be deleted after expiry"
+    );
+    assert!(
+        !remaining_segments.is_empty(),
+        "Active segment should not be deleted"
+    );
+
+    // Verify fewer messages available after cleanup
+    let polled_after = client
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(PARTITION_ID),
+            &Consumer::default(),
+            &PollingStrategy::offset(0),
+            total_messages as u32,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert!(
+        polled_after.messages.len() < polled_before.messages.len(),
+        "Expected fewer messages after cleanup"
+    );
+
+    client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+/// Tests that the active segment is never deleted, even if expired.
+pub async fn run_active_segment_protection(client: &IggyClient, data_path: 
&Path) {
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
+
+    let expiry = Duration::from_secs(1);
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::ExpireDuration(IggyDuration::from(expiry)),
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let partition_path = data_path
+        .join(format!(
+            "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
+        ))
+        .display()
+        .to_string();
+
+    // Send one small message (stays in active segment, no rotation)
+    let message = IggyMessage::builder()
+        .id(1u128)
+        .payload(Bytes::from("small"))
+        .build()
+        .unwrap();
+
+    let mut messages = vec![message];
+    client
+        .send_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            &Partitioning::partition_id(PARTITION_ID),
+            &mut messages,
+        )
+        .await
+        .unwrap();
+
+    let initial_segments = get_segment_paths_for_partition(&partition_path);
+    assert_eq!(initial_segments.len(), 1, "Should have exactly 1 segment");
+
+    // Wait for expiry + cleaner
+    tokio::time::sleep(expiry + CLEANER_BUFFER).await;
+
+    let remaining_segments = get_segment_paths_for_partition(&partition_path);
+    assert_eq!(
+        remaining_segments.len(),
+        1,
+        "Active segment should NOT be deleted even after expiry"
+    );
+
+    client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+/// Tests size-based retention: oldest segments deleted when topic exceeds 
max_size.
+pub async fn run_size_based_retention(client: &IggyClient, data_path: &Path) {
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
+
+    // 150KB max, cleanup at 90% = 135KB. With 100KB segments, exceeding 135KB 
triggers cleanup.
+    let max_size_bytes = 150 * 1024;
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::Custom(IggyByteSize::from(max_size_bytes)),
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let partition_path = data_path
+        .join(format!(
+            "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
+        ))
+        .display()
+        .to_string();
+
+    // Send 160 messages (160KB) to exceed 90% threshold (135KB)
+    let payload = make_payload('B');
+    let total_messages = 160;
+
+    for i in 0..total_messages {
+        let message = IggyMessage::builder()
+            .id(i as u128)
+            .payload(payload.clone())
+            .build()
+            .unwrap();
+
+        let mut messages = vec![message];
+        client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(PARTITION_ID),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    // Wait for cleaner
+    tokio::time::sleep(CLEANER_BUFFER).await;
+
+    let remaining_segments = get_segment_paths_for_partition(&partition_path);
+
+    // Verify oldest messages deleted (first offset > 0)
+    let polled = client
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(PARTITION_ID),
+            &Consumer::default(),
+            &PollingStrategy::offset(0),
+            total_messages as u32,
+            false,
+        )
+        .await
+        .unwrap();
+
+    let first_offset = polled
+        .messages
+        .first()
+        .map(|m| m.header.offset)
+        .unwrap_or(0);
+
+    assert!(
+        first_offset > 0,
+        "Oldest messages should be deleted, first_offset should be > 0"
+    );
+    assert!(
+        polled.messages.len() < total_messages as usize,
+        "Some messages should be deleted"
+    );
+    assert!(
+        !remaining_segments.is_empty(),
+        "Active segment should not be deleted"
+    );
+
+    client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+/// Tests both retention policies together: time-based AND size-based.
+pub async fn run_combined_retention(client: &IggyClient, data_path: &Path) {
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
+
+    let expiry = Duration::from_secs(2);
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::ExpireDuration(IggyDuration::from(expiry)),
+            MaxTopicSize::Custom(IggyByteSize::from(500 * 1024)), // 500KB 
(won't trigger)
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let partition_path = data_path
+        .join(format!(
+            "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
+        ))
+        .display()
+        .to_string();
+
+    // Send 110 messages to create 2 segments (under size threshold, but will 
expire)
+    let payload = make_payload('C');
+    for i in 0..110 {
+        let message = IggyMessage::builder()
+            .id(i as u128)
+            .payload(payload.clone())
+            .build()
+            .unwrap();
+
+        let mut messages = vec![message];
+        client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(PARTITION_ID),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    let initial_segments = get_segment_paths_for_partition(&partition_path);
+    let initial_count = initial_segments.len();
+    assert!(initial_count >= 2, "Expected at least 2 segments");
+
+    // Wait for time-based expiry
+    tokio::time::sleep(expiry + CLEANER_BUFFER).await;
+
+    let remaining_segments = get_segment_paths_for_partition(&partition_path);
+    assert!(
+        remaining_segments.len() < initial_count,
+        "Segments should be deleted after expiry"
+    );
+    assert!(
+        !remaining_segments.is_empty(),
+        "Active segment should not be deleted"
+    );
+
+    client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+/// Tests time-based retention with multiple partitions.
+pub async fn run_expiry_with_multiple_partitions(client: &IggyClient, 
data_path: &Path) {
+    const PARTITIONS_COUNT: u32 = 3;
+
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
+
+    let expiry = Duration::from_secs(3);
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            PARTITIONS_COUNT,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::ExpireDuration(IggyDuration::from(expiry)),
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let payload = make_payload('D');
+    let messages_per_partition = 110;
+
+    // Send messages to all partitions
+    for partition_id in 0..PARTITIONS_COUNT {
+        for i in 0..messages_per_partition {
+            let msg_id = partition_id as u128 * 1000 + i as u128;
+            let message = IggyMessage::builder()
+                .id(msg_id)
+                .payload(payload.clone())
+                .build()
+                .unwrap();
+
+            let mut messages = vec![message];
+            client
+                .send_messages(
+                    &Identifier::named(STREAM_NAME).unwrap(),
+                    &Identifier::named(TOPIC_NAME).unwrap(),
+                    &Partitioning::partition_id(partition_id),
+                    &mut messages,
+                )
+                .await
+                .unwrap();
+        }
+    }
+
+    // Collect initial segment counts
+    let mut initial_counts: Vec<usize> = Vec::new();
+    for partition_id in 0..PARTITIONS_COUNT {
+        let partition_path = data_path
+            .join(format!(
+                
"streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}"
+            ))
+            .display()
+            .to_string();
+        let segments = get_segment_paths_for_partition(&partition_path);
+        initial_counts.push(segments.len());
+        assert!(
+            segments.len() >= 2,
+            "Partition {} should have at least 2 segments, got {}",
+            partition_id,
+            segments.len()
+        );
+    }
+
+    // Wait for expiry + cleaner
+    tokio::time::sleep(expiry + CLEANER_BUFFER).await;
+
+    // Verify cleanup in all partitions
+    let mut total_deleted = 0usize;
+    for partition_id in 0..PARTITIONS_COUNT {
+        let partition_path = data_path
+            .join(format!(
+                
"streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}"
+            ))
+            .display()
+            .to_string();
+        let remaining = get_segment_paths_for_partition(&partition_path);
+        let deleted = initial_counts[partition_id as 
usize].saturating_sub(remaining.len());
+        total_deleted += deleted;
+
+        assert!(
+            !remaining.is_empty(),
+            "Partition {} should retain active segment",
+            partition_id
+        );
+    }
+
+    assert!(
+        total_deleted > 0,
+        "At least some segments should be deleted"
+    );
+
+    client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+/// Tests fair size-based cleanup across multiple partitions.
+pub async fn run_fair_size_based_cleanup_multipartition(client: &IggyClient, 
data_path: &Path) {
+    const PARTITIONS_COUNT: u32 = 3;
+
+    let stream = client.create_stream(STREAM_NAME).await.unwrap();
+    let stream_id = stream.id;
+
+    // 200KB max, cleanup at 90% = 180KB
+    let max_size_bytes = 200 * 1024;
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            PARTITIONS_COUNT,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::Custom(IggyByteSize::from(max_size_bytes)),
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let payload = make_payload('E');
+
+    // Send 70 messages per partition = 210KB total, exceeds 180KB threshold
+    for partition_id in 0..PARTITIONS_COUNT {
+        for i in 0..70 {
+            let msg_id = partition_id as u128 * 1000 + i as u128;
+            let message = IggyMessage::builder()
+                .id(msg_id)
+                .payload(payload.clone())
+                .build()
+                .unwrap();
+
+            let mut messages = vec![message];
+            client
+                .send_messages(
+                    &Identifier::named(STREAM_NAME).unwrap(),
+                    &Identifier::named(TOPIC_NAME).unwrap(),
+                    &Partitioning::partition_id(partition_id),
+                    &mut messages,
+                )
+                .await
+                .unwrap();
+        }
+    }
+
+    // Wait for cleaner
+    tokio::time::sleep(CLEANER_BUFFER).await;
+
+    // Verify segments exist for all partitions
+    for partition_id in 0..PARTITIONS_COUNT {
+        let partition_path = data_path
+            .join(format!(
+                
"streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}"
+            ))
+            .display()
+            .to_string();
+        let segments = get_segment_paths_for_partition(&partition_path);
+        assert!(
+            !segments.is_empty(),
+            "Partition {} should have at least 1 segment",
+            partition_id
+        );
+    }
+
+    client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+fn get_segment_paths_for_partition(partition_path: &str) -> Vec<DirEntry> {
+    read_dir(partition_path)
+        .map(|read_dir| {
+            read_dir
+                .filter_map(|dir_entry| {
+                    dir_entry
+                        .map(|dir_entry| {
+                            match dir_entry
+                                .path()
+                                .extension()
+                                .is_some_and(|ext| ext == LOG_EXTENSION)
+                            {
+                                true => Some(dir_entry),
+                                false => None,
+                            }
+                        })
+                        .ok()
+                        .flatten()
+                })
+                .collect::<Vec<_>>()
+        })
+        .unwrap_or_default()
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index b04866f37..90d45676f 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -30,6 +30,7 @@ pub mod cross_protocol_pat_scenario;
 pub mod delete_segments_scenario;
 pub mod encryption_scenario;
 pub mod log_rotation_scenario;
+pub mod message_cleanup_scenario;
 pub mod message_headers_scenario;
 pub mod message_size_scenario;
 pub mod offset_scenario;
diff --git a/core/integration/tests/server/specific.rs 
b/core/integration/tests/server/specific.rs
index afd4a3a0c..501099a83 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -241,28 +241,22 @@ async fn 
should_handle_single_message_per_batch_with_delayed_persistence() {
 async fn segment_rotation_scenario() {
     let mut extra_envs = HashMap::new();
 
-    // Very small segment to trigger frequent rotations
     extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), 
"512B".to_string());
 
-    // Short message saver interval to add concurrent persist operations
     extra_envs.insert("IGGY_MESSAGE_SAVER_INTERVAL".to_string(), 
"1s".to_string());
 
-    // Small threshold to trigger more frequent saves
     extra_envs.insert(
         "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
         "32".to_string(),
     );
 
-    // cache_indexes = none triggers clear_active_indexes in 
handle_full_segment
     extra_envs.insert(
         "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(),
         "none".to_string(),
     );
 
-    // Disable socket migration to keep all connections on same shard
     extra_envs.insert("IGGY_TCP_SOCKET_MIGRATION".to_string(), 
"false".to_string());
 
-    // Enable TCP nodelay for faster message throughput
     extra_envs.insert(
         "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
         "true".to_string(),
diff --git a/core/server/config.toml b/core/server/config.toml
index 3949b9cd5..372d9da94 100644
--- a/core/server/config.toml
+++ b/core/server/config.toml
@@ -415,25 +415,28 @@ default_algorithm = "none"
 # Specifies the directory where stream data is stored, relative to 
`system.path`.
 path = "streams"
 
-# Topic configuration
+# Topic configuration - default settings for new topics
 [system.topic]
-# Path for storing topic-related data (string).
-# Specifies the directory where topic data is stored, relative to 
`stream.path`.
+# Path for storing topic-related data, relative to `stream.path`.
 path = "topics"
 
-# Configures the topic size-based expiry setting.
-# "unlimited" or "0" means topics are kept indefinitely.
-# A size value in human-readable format determines the maximum size of a topic.
-# When a topic reaches this size, the oldest messages are deleted to make room 
for new ones.
-# Messages are removed in full segments, so if segment size is 1 GiB and the 
topic size is 10 GiB,
-# the oldest segment will be deleted upon reaching 10 GiB.
-# Example: `max_topic_size = "10 GiB"` means oldest messages in topics will be 
deleted when they reach 10 GiB.
-# Note: this setting can be overwritten with CreateTopic and UpdateTopic 
requests.
+# Messages can be deleted based on two independent policies:
+# 1. Size-based: delete oldest segments when topic exceeds max_size
+# 2. Time-based: delete segments older than message_expiry
+# Both can be active simultaneously. Per-topic overrides via 
CreateTopic/UpdateTopic.
+
+# Maximum topic size before oldest segments are deleted.
+# "unlimited" or "0" = no size limit (messages kept indefinitely).
+# When 90% of this limit is reached, oldest segments are removed to make room.
+# Applies to sealed segments only (active segment is protected).
+# Example: "10 GiB"
 max_size = "unlimited"
 
-# Configures whether the oldest segments are deleted when a topic reaches its 
maximum size (boolean).
-# Note: segments are removed in intervals defined by 
`system.message_cleaner.interval`.
-delete_oldest_segments = false
+# Maximum age of messages before segments are deleted.
+# "none" = no time limit (messages kept indefinitely).
+# Applies to sealed segments only (active segment is protected).
+# Example: "7 days", "2 days 4 hours 15 minutes"
+message_expiry = "none"
 
 # Partition configuration
 [system.partition]
@@ -451,23 +454,15 @@ enforce_fsync = false
 # `false` skips these checks for faster loading at the risk of undetected 
corruption.
 validate_checksum = false
 
-# The count threshold of buffered messages before triggering a save to disk 
(integer).
-# Specifies how many messages accumulate before persisting to storage.
-# Adjusting this can balance between write performance and data durability.
-# This is soft limit, actual number of messages may be higher, depending on 
last batch size.
-# Together with `size_of_messages_required_to_save` it defines the threshold 
of buffered messages.
-# Minimum value is 32. Value has to be a multiple of 32 due to minimum
-# direct I/O block size (512 bytes) and message index size (16 bytes per 
message).
-# With direct I/O, writes must occur in blocks of at least 512 bytes, which 
equals 32 message indices.
+# The count threshold of buffered messages before triggering a save to disk.
+# Together with `size_of_messages_required_to_save` it defines the threshold.
+# This is a soft limit - actual count may be higher depending on last batch 
size.
+# Minimum value is 1.
 messages_required_to_save = 1024
 
-# The size threshold of buffered messages before triggering a save to disk 
(string).
-# Specifies how much size of messages accumulate before persisting to storage.
-# Adjusting this can balance between write performance and data durability.
-# This is soft limit, actual number of messages may be higher, depending on 
last batch size.
-# Together with `messages_required_to_save` it defines the threshold of 
buffered messages.
-# Minimum value is 512 B. Value has to be a multiple of 512 B due to direct 
I/O requirements.
-# Direct I/O operations must align with the underlying storage block size 
(typically 512 B or 4 KiB).
+# The size threshold of buffered messages before triggering a save to disk.
+# Together with `messages_required_to_save` it defines the threshold.
+# This is a soft limit - actual size may be higher depending on last batch 
size.
 size_of_messages_required_to_save = "1 MiB"
 
 # Segment configuration
@@ -478,19 +473,6 @@ size_of_messages_required_to_save = "1 MiB"
 # Maximum size is 1 GiB. Size has to be a multiple of 512 B.
 size = "1 GiB"
 
-# Configures the message time-based expiry setting.
-# "none" means messages are kept indefinitely.
-# A time value in human-readable format determines the lifespan of messages.
-# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will 
expire after that duration.
-message_expiry = "none"
-
-# Defines the file system confirmation behavior during state updates.
-# Controls how the system waits for file write operations to complete.
-# Possible values:
-# - "wait": waits for the file operation to complete before proceeding.
-# - "no_wait": proceeds without waiting for the file operation to finish, 
potentially increasing performance but at the cost of durability.
-server_confirmation = "wait"
-
 # Configures whether expired segments are archived (boolean) or just deleted 
without archiving.
 archive_expired = false
 
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 70703e43b..a73281637 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -250,7 +250,6 @@ pub async fn load_segments(
 
         let messages_file_path = format!("{}/{}.{}", partition_path, 
log_file_name, LOG_EXTENSION);
         let index_file_path = format!("{}/{}.{}", partition_path, 
log_file_name, INDEX_EXTENSION);
-        let time_index_path = index_file_path.replace(INDEX_EXTENSION, 
"timeindex");
 
         async fn try_exists(path: &str) -> Result<bool, std::io::Error> {
             match compio::fs::metadata(path).await {
@@ -263,13 +262,12 @@ pub async fn load_segments(
         }
 
         let index_path_exists = try_exists(&index_file_path).await.unwrap();
-        let time_index_path_exists = 
try_exists(&time_index_path).await.unwrap();
         let index_cache_enabled = matches!(
             config.segment.cache_indexes,
             CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
         );
 
-        if index_cache_enabled && (!index_path_exists || 
time_index_path_exists) {
+        if index_cache_enabled && !index_path_exists {
             warn!(
                 "Index at path {} does not exist, rebuilding it based on 
{}...",
                 index_file_path, messages_file_path
@@ -293,10 +291,6 @@ pub async fn load_segments(
             );
         }
 
-        if time_index_path_exists {
-            compio::fs::remove_file(&time_index_path).await.unwrap();
-        }
-
         let messages_metadata = compio::fs::metadata(&messages_file_path)
             .await
             .map_err(|_| IggyError::CannotReadPartitions)?;
@@ -345,11 +339,7 @@ pub async fn load_segments(
             )
         };
 
-        let mut segment = Segment::new(
-            start_offset,
-            config.segment.size,
-            config.segment.message_expiry,
-        );
+        let mut segment = Segment::new(start_offset, config.segment.size);
 
         segment.start_timestamp = start_timestamp;
         segment.end_timestamp = end_timestamp;
@@ -438,6 +428,11 @@ pub async fn load_segments(
         }
     }
 
+    // The last segment is the active one and must remain unsealed for writes
+    if log.has_segments() {
+        log.segments_mut().last_mut().unwrap().sealed = false;
+    }
+
     if matches!(
         config.segment.cache_indexes,
         CacheIndexesConfig::OpenSegment
diff --git a/core/server/src/configs/defaults.rs 
b/core/server/src/configs/defaults.rs
index ce9a84ab4..90979caa4 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -443,7 +443,7 @@ impl Default for TopicConfig {
         TopicConfig {
             path: SERVER_CONFIG.system.topic.path.parse().unwrap(),
             max_size: SERVER_CONFIG.system.topic.max_size.parse().unwrap(),
-            delete_oldest_segments: 
SERVER_CONFIG.system.topic.delete_oldest_segments,
+            message_expiry: 
SERVER_CONFIG.system.topic.message_expiry.parse().unwrap(),
         }
     }
 }
@@ -471,7 +471,6 @@ impl Default for SegmentConfig {
         SegmentConfig {
             size: SERVER_CONFIG.system.segment.size.parse().unwrap(),
             cache_indexes: 
SERVER_CONFIG.system.segment.cache_indexes.parse().unwrap(),
-            message_expiry: 
SERVER_CONFIG.system.segment.message_expiry.parse().unwrap(),
             archive_expired: SERVER_CONFIG.system.segment.archive_expired,
         }
     }
diff --git a/core/server/src/configs/displays.rs 
b/core/server/src/configs/displays.rs
index 515011c25..64faa95b7 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/server/src/configs/displays.rs
@@ -205,8 +205,8 @@ impl Display for TopicConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ path: {}, max_size: {}, delete_oldest_segments: {} }}",
-            self.path, self.max_size, self.delete_oldest_segments
+            "{{ path: {}, max_size: {}, message_expiry: {} }}",
+            self.path, self.max_size, self.message_expiry
         )
     }
 }
@@ -239,8 +239,8 @@ impl Display for SegmentConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ size_bytes: {}, cache_indexes: {}, message_expiry: {}, 
archive_expired: {} }}",
-            self.size, self.cache_indexes, self.message_expiry, 
self.archive_expired,
+            "{{ size_bytes: {}, cache_indexes: {}, archive_expired: {} }}",
+            self.size, self.cache_indexes, self.archive_expired,
         )
     }
 }
diff --git a/core/server/src/configs/system.rs 
b/core/server/src/configs/system.rs
index 8e1205967..58de89ed5 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -119,7 +119,9 @@ pub struct TopicConfig {
     #[config_env(leaf)]
     #[serde_as(as = "DisplayFromStr")]
     pub max_size: MaxTopicSize,
-    pub delete_oldest_segments: bool,
+    #[config_env(leaf)]
+    #[serde_as(as = "DisplayFromStr")]
+    pub message_expiry: IggyExpiry,
 }
 
 #[derive(Debug, Deserialize, Serialize, ConfigEnv)]
@@ -154,9 +156,6 @@ pub struct SegmentConfig {
     pub size: IggyByteSize,
     #[config_env(leaf)]
     pub cache_indexes: CacheIndexesConfig,
-    #[config_env(leaf)]
-    #[serde_as(as = "DisplayFromStr")]
-    pub message_expiry: IggyExpiry,
     pub archive_expired: bool,
 }
 
@@ -328,7 +327,7 @@ impl SystemConfig {
 
     pub fn resolve_message_expiry(&self, message_expiry: IggyExpiry) -> 
IggyExpiry {
         match message_expiry {
-            IggyExpiry::ServerDefault => self.segment.message_expiry,
+            IggyExpiry::ServerDefault => self.topic.message_expiry,
             _ => message_expiry,
         }
     }
diff --git a/core/server/src/configs/validators.rs 
b/core/server/src/configs/validators.rs
index ceb0fe30d..b9d70aaf3 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -35,7 +35,7 @@ use iggy_common::IggyExpiry;
 use iggy_common::MaxTopicSize;
 use iggy_common::Validatable;
 use std::thread::available_parallelism;
-use tracing::{error, warn};
+use tracing::warn;
 
 impl Validatable<ConfigurationError> for ServerConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
@@ -92,20 +92,30 @@ impl Validatable<ConfigurationError> for ServerConfig {
         let topic_size = match self.system.topic.max_size {
             MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()),
             MaxTopicSize::Unlimited => Ok(u64::MAX),
-            MaxTopicSize::ServerDefault => 
Err(ConfigurationError::InvalidConfigurationValue),
+            MaxTopicSize::ServerDefault => {
+                eprintln!("system.topic.max_size cannot be ServerDefault in 
server config");
+                Err(ConfigurationError::InvalidConfigurationValue)
+            }
         }?;
 
-        if let IggyExpiry::ServerDefault = self.system.segment.message_expiry {
+        if let IggyExpiry::ServerDefault = self.system.topic.message_expiry {
+            eprintln!("system.topic.message_expiry cannot be ServerDefault in 
server config");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
         if self.http.enabled
             && let IggyExpiry::ServerDefault = 
self.http.jwt.access_token_expiry
         {
+            eprintln!("http.jwt.access_token_expiry cannot be ServerDefault 
when HTTP is enabled");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
         if topic_size < self.system.segment.size.as_bytes_u64() {
+            eprintln!(
+                "system.topic.max_size ({} B) must be >= system.segment.size 
({} B)",
+                topic_size,
+                self.system.segment.size.as_bytes_u64()
+            );
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
@@ -134,14 +144,17 @@ impl Validatable<ConfigurationError> for TelemetryConfig {
         }
 
         if self.service_name.trim().is_empty() {
+            eprintln!("telemetry.service_name cannot be empty when telemetry 
is enabled");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
         if self.logs.endpoint.is_empty() {
+            eprintln!("telemetry.logs.endpoint cannot be empty when telemetry 
is enabled");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
         if self.traces.endpoint.is_empty() {
+            eprintln!("telemetry.traces.endpoint cannot be empty when 
telemetry is enabled");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
@@ -151,39 +164,8 @@ impl Validatable<ConfigurationError> for TelemetryConfig {
 
 impl Validatable<ConfigurationError> for PartitionConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
-        if self.messages_required_to_save < 32 {
-            error!(
-                "Configured system.partition.messages_required_to_save {} is 
less than minimum {}",
-                self.messages_required_to_save, 32
-            );
-            return Err(ConfigurationError::InvalidConfigurationValue);
-        }
-
-        if !self.messages_required_to_save.is_multiple_of(32) {
-            error!(
-                "Configured system.partition.messages_required_to_save {} is 
not a multiple of 32",
-                self.messages_required_to_save
-            );
-            return Err(ConfigurationError::InvalidConfigurationValue);
-        }
-
-        if self.size_of_messages_required_to_save < 512 {
-            error!(
-                "Configured system.partition.size_of_messages_required_to_save 
{} is less than minimum {}",
-                self.size_of_messages_required_to_save, 512
-            );
-            return Err(ConfigurationError::InvalidConfigurationValue);
-        }
-
-        if !self
-            .size_of_messages_required_to_save
-            .as_bytes_u64()
-            .is_multiple_of(512)
-        {
-            error!(
-                "Configured system.partition.size_of_messages_required_to_save 
{} is not a multiple of 512 B",
-                self.size_of_messages_required_to_save
-            );
+        if self.messages_required_to_save == 0 {
+            eprintln!("Configured system.partition.messages_required_to_save 
cannot be 0");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
@@ -194,7 +176,7 @@ impl Validatable<ConfigurationError> for PartitionConfig {
 impl Validatable<ConfigurationError> for SegmentConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         if self.size > SEGMENT_MAX_SIZE_BYTES {
-            error!(
+            eprintln!(
                 "Configured system.segment.size {} B is greater than maximum 
{} B",
                 self.size.as_bytes_u64(),
                 SEGMENT_MAX_SIZE_BYTES
@@ -203,7 +185,7 @@ impl Validatable<ConfigurationError> for SegmentConfig {
         }
 
         if !self.size.as_bytes_u64().is_multiple_of(512) {
-            error!(
+            eprintln!(
                 "Configured system.segment.size {} B is not a multiple of 512 
B",
                 self.size.as_bytes_u64()
             );
@@ -217,6 +199,7 @@ impl Validatable<ConfigurationError> for SegmentConfig {
 impl Validatable<ConfigurationError> for MessageSaverConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         if self.enabled && self.interval.is_zero() {
+            eprintln!("message_saver.interval cannot be zero when 
message_saver is enabled");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
@@ -236,6 +219,7 @@ impl Validatable<ConfigurationError> for 
DataMaintenanceConfig {
 impl Validatable<ConfigurationError> for MessagesMaintenanceConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         if self.cleaner_enabled && self.interval.is_zero() {
+            eprintln!("data_maintenance.messages.interval cannot be zero when 
cleaner is enabled");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
@@ -246,10 +230,14 @@ impl Validatable<ConfigurationError> for 
MessagesMaintenanceConfig {
 impl Validatable<ConfigurationError> for PersonalAccessTokenConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         if self.max_tokens_per_user == 0 {
+            eprintln!("personal_access_token.max_tokens_per_user cannot be 0");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
         if self.cleaner.enabled && self.cleaner.interval.is_zero() {
+            eprintln!(
+                "personal_access_token.cleaner.interval cannot be zero when 
cleaner is enabled"
+            );
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
@@ -260,12 +248,12 @@ impl Validatable<ConfigurationError> for 
PersonalAccessTokenConfig {
 impl Validatable<ConfigurationError> for LoggingConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         if self.level.is_empty() {
-            error!("system.logging.level is supposed be configured");
+            eprintln!("system.logging.level is supposed be configured");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
         if self.retention.as_secs() < 1 {
-            error!(
+            eprintln!(
                 "Configured system.logging.retention {} is less than minimum 1 
second",
                 self.retention
             );
@@ -273,7 +261,7 @@ impl Validatable<ConfigurationError> for LoggingConfig {
         }
 
         if self.rotation_check_interval.as_secs() < 1 {
-            error!(
+            eprintln!(
                 "Configured system.logging.rotation_check_interval {} is less 
than minimum 1 second",
                 self.rotation_check_interval
             );
@@ -284,7 +272,7 @@ impl Validatable<ConfigurationError> for LoggingConfig {
         if !max_total_size_unlimited
             && self.max_file_size.as_bytes_u64() > 
self.max_total_size.as_bytes_u64()
         {
-            error!(
+            eprintln!(
                 "Configured system.logging.max_total_size {} is less than 
system.logging.max_file_size {}",
                 self.max_total_size, self.max_file_size
             );
@@ -298,7 +286,7 @@ impl Validatable<ConfigurationError> for LoggingConfig {
 impl Validatable<ConfigurationError> for MemoryPoolConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         if self.enabled && self.size == 0 {
-            error!(
+            eprintln!(
                 "Configured system.memory_pool.enabled is true and 
system.memory_pool.size is 0"
             );
             return Err(ConfigurationError::InvalidConfigurationValue);
@@ -309,7 +297,7 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig {
         const DEFAULT_PAGE_SIZE: u64 = 4096;
 
         if self.enabled && self.size < MIN_POOL_SIZE {
-            error!(
+            eprintln!(
                 "Configured system.memory_pool.size {} B ({} MiB) is less than 
minimum {} B, ({} MiB)",
                 self.size.as_bytes_u64(),
                 self.size.as_bytes_u64() / (1024 * 1024),
@@ -320,7 +308,7 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig {
         }
 
         if self.enabled && 
!self.size.as_bytes_u64().is_multiple_of(DEFAULT_PAGE_SIZE) {
-            error!(
+            eprintln!(
                 "Configured system.memory_pool.size {} B is not a multiple of 
default page size {} B",
                 self.size.as_bytes_u64(),
                 DEFAULT_PAGE_SIZE
@@ -329,7 +317,7 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig {
         }
 
         if self.enabled && self.bucket_capacity < MIN_BUCKET_CAPACITY {
-            error!(
+            eprintln!(
                 "Configured system.memory_pool.buffers {} is less than minimum 
{}",
                 self.bucket_capacity, MIN_BUCKET_CAPACITY
             );
@@ -337,7 +325,7 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig {
         }
 
         if self.enabled && !self.bucket_capacity.is_power_of_two() {
-            error!(
+            eprintln!(
                 "Configured system.memory_pool.buffers {} is not a power of 2",
                 self.bucket_capacity
             );
@@ -358,11 +346,11 @@ impl Validatable<ConfigurationError> for ShardingConfig {
             CpuAllocation::All => Ok(()),
             CpuAllocation::Count(count) => {
                 if *count == 0 {
-                    error!("Invalid sharding configuration: cpu_allocation 
count cannot be 0");
+                    eprintln!("Invalid sharding configuration: cpu_allocation 
count cannot be 0");
                     return Err(ConfigurationError::InvalidConfigurationValue);
                 }
                 if *count > available_cpus {
-                    error!(
+                    eprintln!(
                         "Invalid sharding configuration: cpu_allocation count 
{count} exceeds available CPU cores {available_cpus}"
                     );
                     return Err(ConfigurationError::InvalidConfigurationValue);
@@ -371,13 +359,13 @@ impl Validatable<ConfigurationError> for ShardingConfig {
             }
             CpuAllocation::Range(start, end) => {
                 if start >= end {
-                    error!(
+                    eprintln!(
                         "Invalid sharding configuration: cpu_allocation range 
{start}..{end} is invalid (start must be less than end)"
                     );
                     return Err(ConfigurationError::InvalidConfigurationValue);
                 }
                 if *end > available_cpus {
-                    error!(
+                    eprintln!(
                         "Invalid sharding configuration: cpu_allocation range 
{start}..{end} exceeds available CPU cores (max: {available_cpus})"
                     );
                     return Err(ConfigurationError::InvalidConfigurationValue);
@@ -387,12 +375,12 @@ impl Validatable<ConfigurationError> for ShardingConfig {
             CpuAllocation::NumaAware(numa_config) => match 
NumaTopology::detect() {
                 // TODO: dry the validation, already validate it from the 
shard allocation
                 Ok(topology) => numa_config.validate(&topology).map_err(|e| {
-                    error!("Invalid NUMA configuration: {}", e);
+                    eprintln!("Invalid NUMA configuration: {}", e);
                     ConfigurationError::InvalidConfigurationValue
                 }),
                 Err(e) => {
-                    error!("Failed to detect NUMA topology: {}", e);
-                    error!("NUMA allocation requested but system doesn't 
support it");
+                    eprintln!("Failed to detect NUMA topology: {}", e);
+                    eprintln!("NUMA allocation requested but system doesn't 
support it");
                     Err(ConfigurationError::InvalidConfigurationValue)
                 }
             },
@@ -408,13 +396,13 @@ impl Validatable<ConfigurationError> for ClusterConfig {
 
         // Validate cluster name is not empty
         if self.name.trim().is_empty() {
-            error!("Invalid cluster configuration: cluster name cannot be 
empty");
+            eprintln!("Invalid cluster configuration: cluster name cannot be 
empty");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
         // Validate current node name is not empty
         if self.node.current.name.trim().is_empty() {
-            error!("Invalid cluster configuration: current node name cannot be 
empty");
+            eprintln!("Invalid cluster configuration: current node name cannot 
be empty");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
@@ -424,7 +412,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
 
         for node in &self.node.others {
             if !node_names.insert(node.name.clone()) {
-                error!(
+                eprintln!(
                     "Invalid cluster configuration: duplicate node name '{}' 
found",
                     node.name
                 );
@@ -437,13 +425,13 @@ impl Validatable<ConfigurationError> for ClusterConfig {
         for node in &self.node.others {
             // Validate node name is not empty
             if node.name.trim().is_empty() {
-                error!("Invalid cluster configuration: node name cannot be 
empty");
+                eprintln!("Invalid cluster configuration: node name cannot be 
empty");
                 return Err(ConfigurationError::InvalidConfigurationValue);
             }
 
             // Validate IP is not empty
             if node.ip.trim().is_empty() {
-                error!(
+                eprintln!(
                     "Invalid cluster configuration: IP cannot be empty for 
node '{}'",
                     node.name
                 );
@@ -461,7 +449,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
             for (name, port_opt) in &port_list {
                 if let Some(port) = port_opt {
                     if *port == 0 {
-                        error!(
+                        eprintln!(
                             "Invalid cluster configuration: {} port cannot be 
0 for node '{}'",
                             name, node.name
                         );
@@ -471,7 +459,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
                     // Check for port conflicts across nodes on the same IP
                     let endpoint = format!("{}:{}:{}", node.ip, name, port);
                     if !used_endpoints.insert(endpoint.clone()) {
-                        error!(
+                        eprintln!(
                             "Invalid cluster configuration: port conflict - 
{}:{} is already used",
                             node.ip, port
                         );
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index cc2b3a75c..008495743 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -595,7 +595,7 @@ impl IggyShard {
             return Ok(0);
         }
 
-        let (messages_writer, index_writer, segment_index) = {
+        let (messages_writer, index_writer) = {
             let partitions = self.local_partitions.borrow();
             let partition = partitions
                 .get(namespace)
@@ -605,7 +605,6 @@ impl IggyShard {
                 return Ok(0);
             }
 
-            let segment_index = partition.log.segments().len() - 1;
             let messages_writer = partition
                 .log
                 .active_storage()
@@ -620,7 +619,7 @@ impl IggyShard {
                 .as_ref()
                 .expect("Index writer not initialized")
                 .clone();
-            (messages_writer, index_writer, segment_index)
+            (messages_writer, index_writer)
         };
 
         let saved = messages_writer
@@ -633,6 +632,7 @@ impl IggyShard {
             let partition = partitions
                 .get(namespace)
                 .expect("local_partitions: partition must exist");
+            let segment_index = partition.log.segments().len() - 1;
             partition.log.indexes()[segment_index]
                 .as_ref()
                 .expect("indexes must exist for segment being persisted")
@@ -657,6 +657,9 @@ impl IggyShard {
                 .get_mut(namespace)
                 .expect("local_partitions: partition must exist");
 
+            // Recalculate index: segment deletion during async I/O shifts 
indices
+            let segment_index = partition.log.segments().len() - 1;
+
             let indexes = partition.log.indexes_mut()[segment_index]
                 .as_mut()
                 .expect("indexes must exist for segment being persisted");
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index 8b32dce6c..4c3a04a85 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -241,11 +241,7 @@ impl IggyShard {
             );
 
             let start_offset = 0;
-            let segment = Segment::new(
-                start_offset,
-                self.config.system.segment.size,
-                self.config.system.segment.message_expiry,
-            );
+            let segment = Segment::new(start_offset, 
self.config.system.segment.size);
 
             let storage = create_segment_storage(
                 &self.config.system,
diff --git a/core/server/src/shard/system/segments.rs 
b/core/server/src/shard/system/segments.rs
index fb4a68496..38e91fd2d 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -121,8 +121,6 @@ impl IggyShard {
         Ok(())
     }
 
-    /// Initialize a new segment in local_partitions.
-    /// Used when partition data is in local_partitions (not slabs).
     async fn init_log_in_local_partitions(
         &self,
         namespace: &IggyNamespace,
@@ -130,11 +128,7 @@ impl IggyShard {
         use crate::streaming::segments::storage::create_segment_storage;
 
         let start_offset = 0;
-        let segment = Segment::new(
-            start_offset,
-            self.config.system.segment.size,
-            self.config.system.segment.message_expiry,
-        );
+        let segment = Segment::new(start_offset, 
self.config.system.segment.size);
 
         let storage = create_segment_storage(
             &self.config.system,
@@ -161,6 +155,7 @@ impl IggyShard {
 
     /// Rotate to a new segment when the current segment is full.
     /// The new segment starts at the next offset after the current segment's 
end.
+    /// Seals the old segment so it becomes eligible for expiry-based cleanup.
     pub(crate) async fn rotate_segment_in_local_partitions(
         &self,
         namespace: &IggyNamespace,
@@ -168,18 +163,16 @@ impl IggyShard {
         use crate::streaming::segments::storage::create_segment_storage;
 
         let start_offset = {
-            let partitions = self.local_partitions.borrow();
+            let mut partitions = self.local_partitions.borrow_mut();
             let partition = partitions
-                .get(namespace)
+                .get_mut(namespace)
                 .expect("rotate_segment: partition must exist");
-            partition.log.active_segment().end_offset + 1
+            let active_segment = partition.log.active_segment_mut();
+            active_segment.sealed = true;
+            active_segment.end_offset + 1
         };
 
-        let segment = Segment::new(
-            start_offset,
-            self.config.system.segment.size,
-            self.config.system.segment.message_expiry,
-        );
+        let segment = Segment::new(start_offset, 
self.config.system.segment.size);
 
         let storage = create_segment_storage(
             &self.config.system,
@@ -197,9 +190,9 @@ impl IggyShard {
             partition.log.add_persisted_segment(segment, storage);
             partition.stats.increment_segments_count(1);
             tracing::info!(
-                "Rotated to new segment at offset {} for partition {:?}",
+                "Rotated to new segment at offset {} for partition {}",
                 start_offset,
-                namespace
+                namespace.partition_id()
             );
         }
         Ok(())
diff --git a/core/server/src/shard/tasks/periodic/message_cleaner.rs 
b/core/server/src/shard/tasks/periodic/message_cleaner.rs
index 62a0a7a24..5e33428c2 100644
--- a/core/server/src/shard/tasks/periodic/message_cleaner.rs
+++ b/core/server/src/shard/tasks/periodic/message_cleaner.rs
@@ -18,7 +18,7 @@
 
 use crate::shard::IggyShard;
 use iggy_common::sharding::IggyNamespace;
-use iggy_common::{IggyError, IggyTimestamp, MaxTopicSize};
+use iggy_common::{IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize};
 use std::rc::Rc;
 use tracing::{debug, error, info, trace, warn};
 
@@ -52,7 +52,6 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
 
     let namespaces = shard.get_current_shard_namespaces();
     let now = IggyTimestamp::now();
-    let delete_oldest_segments = 
shard.config.system.topic.delete_oldest_segments;
 
     let mut topics: std::collections::HashMap<(usize, usize), Vec<usize>> =
         std::collections::HashMap::new();
@@ -73,8 +72,8 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
         let mut topic_deleted_segments = 0u64;
         let mut topic_deleted_messages = 0u64;
 
-        for partition_id in partition_ids {
-            // Handle expired segments
+        // Phase 1: Time-based expiry cleanup per partition
+        for &partition_id in &partition_ids {
             let expired_result =
                 handle_expired_segments(&shard, stream_id, topic_id, 
partition_id, now).await;
 
@@ -90,24 +89,22 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
                     );
                 }
             }
+        }
 
-            // Handle oldest segments if topic size management is enabled
-            if delete_oldest_segments {
-                let oldest_result =
-                    handle_oldest_segments(&shard, stream_id, topic_id, 
partition_id).await;
-
-                match oldest_result {
-                    Ok(deleted) => {
-                        topic_deleted_segments += deleted.segments_count;
-                        topic_deleted_messages += deleted.messages_count;
-                    }
-                    Err(err) => {
-                        error!(
-                            "Failed to clean oldest segments for stream ID: 
{}, topic ID: {}, partition ID: {}. Error: {}",
-                            stream_id, topic_id, partition_id, err
-                        );
-                    }
-                }
+        // Phase 2: Size-based cleanup at topic level (fair across partitions)
+        let size_result =
+            handle_size_based_cleanup(&shard, stream_id, topic_id, 
&partition_ids).await;
+
+        match size_result {
+            Ok(deleted) => {
+                topic_deleted_segments += deleted.segments_count;
+                topic_deleted_messages += deleted.messages_count;
+            }
+            Err(err) => {
+                error!(
+                    "Failed to clean segments by size for stream ID: {}, topic 
ID: {}. Error: {}",
+                    stream_id, topic_id, err
+                );
             }
         }
 
@@ -119,7 +116,6 @@ async fn clean_expired_messages(shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
             total_deleted_segments += topic_deleted_segments;
             total_deleted_messages += topic_deleted_messages;
 
-            // Update metrics
             shard
                 .metrics
                 .decrement_segments(topic_deleted_segments as u32);
@@ -148,6 +144,13 @@ struct DeletedSegments {
     pub messages_count: u64,
 }
 
+impl DeletedSegments {
+    fn add(&mut self, other: &DeletedSegments) {
+        self.segments_count += other.segments_count;
+        self.messages_count += other.messages_count;
+    }
+}
+
 async fn handle_expired_segments(
     shard: &Rc<IggyShard>,
     stream_id: usize,
@@ -157,18 +160,29 @@ async fn handle_expired_segments(
 ) -> Result<DeletedSegments, IggyError> {
     let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
 
-    // Scope the borrow to avoid holding across await
+    let expiry = shard
+        .metadata
+        .get_topic_config(stream_id, topic_id)
+        .map(|(exp, _)| exp)
+        .unwrap_or(shard.config.system.topic.message_expiry);
+
+    if matches!(expiry, IggyExpiry::NeverExpire) {
+        return Ok(DeletedSegments::default());
+    }
+
     let expired_segment_offsets: Vec<u64> = {
         let partitions = shard.local_partitions.borrow();
         let Some(partition) = partitions.get(&ns) else {
             return Ok(DeletedSegments::default());
         };
-        partition
-            .log
-            .segments()
+        let segments = partition.log.segments();
+        let last_idx = segments.len().saturating_sub(1);
+
+        segments
             .iter()
-            .filter(|segment| segment.is_expired(now))
-            .map(|segment| segment.start_offset)
+            .enumerate()
+            .filter(|(idx, segment)| *idx != last_idx && 
segment.is_expired(now, expiry))
+            .map(|(_, segment)| segment.start_offset)
             .collect()
     };
 
@@ -194,13 +208,15 @@ async fn handle_expired_segments(
     .await
 }
 
-async fn handle_oldest_segments(
+/// Handles size-based cleanup at the topic level.
+/// Deletes the globally oldest sealed segment across all partitions until 
topic size is below 90% threshold.
+async fn handle_size_based_cleanup(
     shard: &Rc<IggyShard>,
     stream_id: usize,
     topic_id: usize,
-    partition_id: usize,
+    partition_ids: &[usize],
 ) -> Result<DeletedSegments, IggyError> {
-    let Some((max_size, current_size)) = shard.metadata.with_metadata(|m| {
+    let Some((max_size, _)) = shard.metadata.with_metadata(|m| {
         m.streams
             .get(stream_id)
             .and_then(|s| s.topics.get(topic_id))
@@ -209,58 +225,109 @@ async fn handle_oldest_segments(
         return Ok(DeletedSegments::default());
     };
 
-    let is_unlimited = matches!(max_size, MaxTopicSize::Unlimited);
-
-    if is_unlimited {
-        debug!(
-            "Topic is unlimited, oldest segments will not be deleted for 
stream ID: {}, topic ID: {}, partition ID: {}",
-            stream_id, topic_id, partition_id
-        );
+    if matches!(max_size, MaxTopicSize::Unlimited) {
         return Ok(DeletedSegments::default());
     }
 
     let max_bytes = max_size.as_bytes_u64();
-    let is_almost_full = current_size >= (max_bytes * 9 / 10);
+    let threshold = max_bytes * 9 / 10;
+
+    let mut total_deleted = DeletedSegments::default();
+
+    loop {
+        let current_size = shard
+            .metadata
+            .with_metadata(|m| {
+                m.streams
+                    .get(stream_id)
+                    .and_then(|s| s.topics.get(topic_id))
+                    .map(|t| t.stats.size_bytes_inconsistent())
+            })
+            .unwrap_or(0);
+
+        if current_size < threshold {
+            break;
+        }
 
-    if !is_almost_full {
-        debug!(
-            "Topic is not almost full, oldest segments will not be deleted for 
stream ID: {}, topic ID: {}, partition ID: {}",
-            stream_id, topic_id, partition_id
+        let Some((target_partition_id, target_offset, target_timestamp)) =
+            find_oldest_segment_in_shard(shard, stream_id, topic_id, 
partition_ids)
+        else {
+            debug!(
+                "No deletable segments found for stream ID: {}, topic ID: {} 
(all partitions have only active segment)",
+                stream_id, topic_id
+            );
+            break;
+        };
+
+        info!(
+            "Deleting oldest segment (start_offset: {}, timestamp: {}) from 
partition {} for stream ID: {}, topic ID: {}",
+            target_offset, target_timestamp, target_partition_id, stream_id, 
topic_id
         );
-        return Ok(DeletedSegments::default());
+
+        let deleted = delete_segments(
+            shard,
+            stream_id,
+            topic_id,
+            target_partition_id,
+            &[target_offset],
+        )
+        .await?;
+        total_deleted.add(&deleted);
+
+        if deleted.segments_count == 0 {
+            break;
+        }
     }
 
-    let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
+    Ok(total_deleted)
+}
 
-    // Scope the borrow to avoid holding across await
-    let oldest_segment_offset = {
-        let partitions = shard.local_partitions.borrow();
-        partitions.get(&ns).and_then(|partition| {
-            let segments = partition.log.segments();
-            // Find the first closed segment (not the active one)
-            if segments.len() > 1 {
-                // The last segment is always active, so we look at earlier 
ones
-                segments.first().map(|s| s.start_offset)
-            } else {
-                None
-            }
-        })
-    };
+/// Finds the oldest sealed segment across partitions owned by this shard.
+/// For each partition, the first segment in the vector is the oldest 
(segments are ordered).
+/// Compares first segments across partitions by timestamp to ensure fair 
deletion.
+/// Returns (partition_id, start_offset, start_timestamp) or None if no 
deletable segments exist.
+fn find_oldest_segment_in_shard(
+    shard: &Rc<IggyShard>,
+    stream_id: usize,
+    topic_id: usize,
+    partition_ids: &[usize],
+) -> Option<(usize, u64, u64)> {
+    let partitions = shard.local_partitions.borrow();
 
-    if let Some(start_offset) = oldest_segment_offset {
-        info!(
-            "Deleting oldest segment with start offset {} for stream ID: {}, 
topic ID: {}, partition ID: {}",
-            start_offset, stream_id, topic_id, partition_id
-        );
+    let mut oldest: Option<(usize, u64, u64)> = None;
 
-        delete_segments(shard, stream_id, topic_id, partition_id, 
&[start_offset]).await
-    } else {
-        debug!(
-            "No closed segments found to delete for stream ID: {}, topic ID: 
{}, partition ID: {}",
-            stream_id, topic_id, partition_id
+    for &partition_id in partition_ids {
+        let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
+        let Some(partition) = partitions.get(&ns) else {
+            continue;
+        };
+
+        let segments = partition.log.segments();
+        if segments.len() <= 1 {
+            continue;
+        }
+
+        // First segment is the oldest in this partition (segments are ordered 
chronologically)
+        let first_segment = &segments[0];
+        if !first_segment.sealed {
+            continue;
+        }
+
+        let candidate = (
+            partition_id,
+            first_segment.start_offset,
+            first_segment.start_timestamp,
         );
-        Ok(DeletedSegments::default())
+        match &oldest {
+            None => oldest = Some(candidate),
+            Some((_, _, oldest_ts)) if first_segment.start_timestamp < 
*oldest_ts => {
+                oldest = Some(candidate);
+            }
+            _ => {}
+        }
     }
+
+    oldest
 }
 
 async fn delete_segments(
@@ -287,7 +354,6 @@ async fn delete_segments(
 
     let ns = IggyNamespace::new(stream_id, topic_id, partition_id);
 
-    // Extract segments and storages to delete from local_partitions
     let (stats, segments_to_delete, mut storages_to_delete) = {
         let mut partitions = shard.local_partitions.borrow_mut();
         let Some(partition) = partitions.get_mut(&ns) else {
@@ -334,7 +400,7 @@ async fn delete_segments(
         let start_offset = segment.start_offset;
         let end_offset = segment.end_offset;
 
-        let approx_messages = if (end_offset - start_offset) == 0 {
+        let messages_in_segment = if start_offset == end_offset {
             0
         } else {
             (end_offset - start_offset) + 1
@@ -362,14 +428,6 @@ async fn delete_segments(
             } else {
                 trace!("Deleted index file: {}", path);
             }
-
-            let time_index_path = path.replace(".index", ".timeindex");
-            if let Err(e) = compio::fs::remove_file(&time_index_path).await {
-                trace!(
-                    "Could not delete time index file {}: {}",
-                    time_index_path, e
-                );
-            }
         } else {
             warn!(
                 "Index writer path not found for segment starting at offset 
{}",
@@ -379,15 +437,15 @@ async fn delete_segments(
 
         stats.decrement_size_bytes(segment_size);
         stats.decrement_segments_count(1);
-        stats.decrement_messages_count(messages_count);
+        stats.decrement_messages_count(messages_in_segment);
 
         info!(
             "Deleted segment with start offset {} (end: {}, size: {}, 
messages: {}) from partition ID: {}",
-            start_offset, end_offset, segment_size, approx_messages, 
partition_id
+            start_offset, end_offset, segment_size, messages_in_segment, 
partition_id
         );
 
         segments_count += 1;
-        messages_count += approx_messages;
+        messages_count += messages_in_segment;
     }
 
     Ok(DeletedSegments {
diff --git a/core/server/src/streaming/segments/segment.rs 
b/core/server/src/streaming/segments/segment.rs
index fe653b44f..d73925b08 100644
--- a/core/server/src/streaming/segments/segment.rs
+++ b/core/server/src/streaming/segments/segment.rs
@@ -21,24 +21,22 @@ use std::fmt::Display;
 #[derive(Default, Debug, Clone)]
 pub struct Segment {
     pub sealed: bool,
-    pub message_expiry: IggyExpiry,
     pub start_timestamp: u64,
     pub end_timestamp: u64,
     pub current_position: u32,
     pub start_offset: u64,
     pub end_offset: u64,
-    pub size: IggyByteSize,     // u64
-    pub max_size: IggyByteSize, // u64
+    pub size: IggyByteSize,
+    pub max_size: IggyByteSize,
 }
 
 impl Display for Segment {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "Segment {{ sealed: {}, max_size_bytes: {}, message_expiry: {:?}, 
start_timestamp: {}, end_timestamp: {}, start_offset: {}, end_offset: {}, size: 
{} }}",
+            "Segment {{ sealed: {}, max_size: {}, start_timestamp: {}, 
end_timestamp: {}, start_offset: {}, end_offset: {}, size: {} }}",
             self.sealed,
             self.max_size,
-            self.message_expiry,
             self.start_timestamp,
             self.end_timestamp,
             self.start_offset,
@@ -49,16 +47,10 @@ impl Display for Segment {
 }
 
 impl Segment {
-    /// Creates a new Segment with the specified parameters
-    pub fn new(
-        start_offset: u64,
-        max_size_bytes: IggyByteSize,
-        message_expiry: IggyExpiry,
-    ) -> Self {
+    pub fn new(start_offset: u64, max_size_bytes: IggyByteSize) -> Self {
         Self {
             sealed: false,
             max_size: max_size_bytes,
-            message_expiry,
             start_timestamp: 0,
             end_timestamp: 0,
             start_offset,
@@ -69,24 +61,19 @@ impl Segment {
     }
 
     pub fn is_full(&self) -> bool {
-        if self.size >= self.max_size {
-            return true;
-        }
-
-        self.is_expired(IggyTimestamp::now())
+        self.size >= self.max_size
     }
 
-    pub fn is_expired(&self, now: IggyTimestamp) -> bool {
+    pub fn is_expired(&self, now: IggyTimestamp, expiry: IggyExpiry) -> bool {
         if !self.sealed {
             return false;
         }
 
-        match self.message_expiry {
+        match expiry {
             IggyExpiry::NeverExpire => false,
             IggyExpiry::ServerDefault => false,
-            IggyExpiry::ExpireDuration(expiry) => {
-                let last_message_timestamp = self.end_timestamp;
-                last_message_timestamp + expiry.as_micros() <= now.as_micros()
+            IggyExpiry::ExpireDuration(duration) => {
+                self.end_timestamp + duration.as_micros() <= now.as_micros()
             }
         }
     }

Reply via email to