This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_final
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 6454be16aff108f969edb89729c7641da1057ebb
Author: numminex <[email protected]>
AuthorDate: Thu Nov 6 17:00:17 2025 +0100

    bring back the message cleaner task
---
 core/configs/server.toml                           |  45 +--
 core/server/src/configs/defaults.rs                |   1 -
 core/server/src/configs/displays.rs                |  32 +-
 core/server/src/configs/server.rs                  |  16 -
 core/server/src/configs/validators.rs              |   2 +-
 core/server/src/shard/mod.rs                       |   4 +
 .../src/shard/tasks/periodic/message_cleaner.rs    | 392 +++++++++++++++++++++
 core/server/src/shard/tasks/periodic/mod.rs        |   2 +
 core/server/src/state/file.rs                      |   2 -
 .../src/streaming/segments/indexes/index_reader.rs |   4 +
 .../streaming/segments/messages/messages_reader.rs |   4 +
 core/server/src/streaming/segments/storage.rs      |   6 +
 core/server/src/streaming/topics/helpers.rs        |  24 ++
 13 files changed, 444 insertions(+), 90 deletions(-)

diff --git a/core/configs/server.toml b/core/configs/server.toml
index ea1be6c66..ef7fa96be 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -15,45 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[data_maintenance.archiver]
-# Enables or disables the archiver process.
-enabled = false
-
-# Kind of archiver to use. Available options: "disk".
-kind = "disk"
-
-[data_maintenance.archiver.disk]
-# Path for storing the archived data on disk.
-path = "local_data/archive"
-
-[data_maintenance.archiver.s3]
-# Access key ID for the S3 bucket.
-key_id = "123"
-
-# Secret access key for the S3 bucket
-key_secret = "secret"
-
-# Name of the S3 bucket.
-bucket = "iggy"
-
-# Endpoint of the S3 region.
-endpoint = "http://localhost:9000";
-
-# Region of the S3 bucket.
-region = "eu-west-1"
-
-# Temporary directory for storing the data before uploading to S3.
-tmp_upload_dir = "local_data/s3_tmp"
-
 [data_maintenance.messages]
-# Enables or disables the archiver process for closed segments containing 
messages.
-archiver_enabled = false
-
 # Enables or disables the expired message cleaner process.
 cleaner_enabled = false
 
 # Interval for running the message archiver and cleaner.
-interval = "35s"
+interval = "1 m"
 
 [data_maintenance.state]
 # Enables or disables the archiver process for state log.
@@ -285,16 +252,6 @@ send_buffer_size = "64 KB"
 # SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the 
connection
 keepalive = false
 
-# Message cleaner configuration.
-[message_cleaner]
-# Enables or disables the background process for deleting expired messages.
-# `true` activates the message cleaner.
-# `false` turns it off, messages will not be auto-deleted based on expiry.
-enabled = true
-
-# Interval for running the message cleaner.
-interval = "1 m"
-
 # Message saver configuration.
 [message_saver]
 # Enables or disables the background process for saving buffered data to disk.
diff --git a/core/server/src/configs/defaults.rs 
b/core/server/src/configs/defaults.rs
index 7aed1e606..2b66dc7f3 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -67,7 +67,6 @@ impl Default for ServerConfig {
 impl Default for MessagesMaintenanceConfig {
     fn default() -> MessagesMaintenanceConfig {
         MessagesMaintenanceConfig {
-            archiver_enabled: 
SERVER_CONFIG.data_maintenance.messages.archiver_enabled,
             cleaner_enabled: 
SERVER_CONFIG.data_maintenance.messages.cleaner_enabled,
             interval: SERVER_CONFIG
                 .data_maintenance
diff --git a/core/server/src/configs/displays.rs 
b/core/server/src/configs/displays.rs
index fe66dbedd..c70aeffb5 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/server/src/configs/displays.rs
@@ -18,9 +18,8 @@
 
 use crate::configs::quic::{QuicCertificateConfig, QuicConfig};
 use crate::configs::server::{
-    DataMaintenanceConfig, DiskArchiverConfig, HeartbeatConfig, 
MessagesMaintenanceConfig,
-    S3ArchiverConfig, StateMaintenanceConfig, TelemetryConfig, 
TelemetryLogsConfig,
-    TelemetryTracesConfig,
+    DataMaintenanceConfig, HeartbeatConfig, MessagesMaintenanceConfig, 
StateMaintenanceConfig,
+    TelemetryConfig, TelemetryLogsConfig, TelemetryTracesConfig,
 };
 use crate::configs::system::MessageDeduplicationConfig;
 use crate::configs::{
@@ -145,31 +144,12 @@ impl Display for DataMaintenanceConfig {
     }
 }
 
-impl Display for DiskArchiverConfig {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        write!(f, "{{ path: {} }}", self.path)
-    }
-}
-
-impl Display for S3ArchiverConfig {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        write!(
-            f,
-            "{{ key_id: {}, bucket: {}, endpoint: {}. region: {} }}",
-            self.key_id,
-            self.bucket,
-            self.endpoint.as_deref().unwrap_or_default(),
-            self.region.as_deref().unwrap_or_default()
-        )
-    }
-}
-
 impl Display for MessagesMaintenanceConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ archiver_enabled: {}, cleaner_enabled: {}, interval: {} }}",
-            self.archiver_enabled, self.cleaner_enabled, self.interval
+            "{{ cleaner_enabled: {}, interval: {} }}",
+            self.cleaner_enabled, self.interval
         )
     }
 }
@@ -178,8 +158,8 @@ impl Display for StateMaintenanceConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ archiver_enabled: {}, overwrite: {}, interval: {} }}",
-            self.archiver_enabled, self.overwrite, self.interval
+            "{{ overwrite: {}, interval: {} }}",
+            self.overwrite, self.interval
         )
     }
 }
diff --git a/core/server/src/configs/server.rs 
b/core/server/src/configs/server.rs
index 26b6ef033..18c095ebf 100644
--- a/core/server/src/configs/server.rs
+++ b/core/server/src/configs/server.rs
@@ -60,7 +60,6 @@ pub struct DataMaintenanceConfig {
 #[serde_as]
 #[derive(Debug, Deserialize, Serialize, Clone)]
 pub struct MessagesMaintenanceConfig {
-    pub archiver_enabled: bool,
     pub cleaner_enabled: bool,
     #[serde_as(as = "DisplayFromStr")]
     pub interval: IggyDuration,
@@ -75,21 +74,6 @@ pub struct StateMaintenanceConfig {
     pub interval: IggyDuration,
 }
 
-#[derive(Debug, Deserialize, Serialize, Clone)]
-pub struct DiskArchiverConfig {
-    pub path: String,
-}
-
-#[derive(Debug, Deserialize, Serialize, Clone)]
-pub struct S3ArchiverConfig {
-    pub key_id: String,
-    pub key_secret: String,
-    pub bucket: String,
-    pub endpoint: Option<String>,
-    pub region: Option<String>,
-    pub tmp_upload_dir: String,
-}
-
 #[serde_as]
 #[derive(Debug, Deserialize, Serialize, Clone)]
 pub struct MessageSaverConfig {
diff --git a/core/server/src/configs/validators.rs 
b/core/server/src/configs/validators.rs
index 9f1e4eea7..2d4d05f15 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -212,7 +212,7 @@ impl Validatable<ConfigError> for DataMaintenanceConfig {
 
 impl Validatable<ConfigError> for MessagesMaintenanceConfig {
     fn validate(&self) -> Result<(), ConfigError> {
-        if self.archiver_enabled && self.interval.is_zero() {
+        if self.cleaner_enabled && self.interval.is_zero() {
             return Err(ConfigError::InvalidConfiguration);
         }
 
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 05287c037..9d272c417 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -146,6 +146,10 @@ impl IggyShard {
             periodic::spawn_message_saver(self.clone());
         }
 
+        if self.config.data_maintenance.messages.cleaner_enabled {
+            periodic::spawn_message_cleaner(self.clone());
+        }
+
         if self.config.heartbeat.enabled {
             periodic::spawn_heartbeat_verifier(self.clone());
         }
diff --git a/core/server/src/shard/tasks/periodic/message_cleaner.rs 
b/core/server/src/shard/tasks/periodic/message_cleaner.rs
new file mode 100644
index 000000000..54f6f0f10
--- /dev/null
+++ b/core/server/src/shard/tasks/periodic/message_cleaner.rs
@@ -0,0 +1,392 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::shard::IggyShard;
+use crate::streaming::topics::helpers as topics_helpers;
+use iggy_common::{Identifier, IggyError, IggyTimestamp};
+use std::rc::Rc;
+use tracing::{debug, error, info, trace, warn};
+
+pub fn spawn_message_cleaner(shard: Rc<IggyShard>) {
+    if !shard.config.data_maintenance.messages.cleaner_enabled {
+        info!("Message cleaner is disabled.");
+        return;
+    }
+
+    let period = shard
+        .config
+        .data_maintenance
+        .messages
+        .interval
+        .get_duration();
+    info!(
+        "Message cleaner is enabled, expired segments will be automatically 
deleted every: {:?}",
+        period
+    );
+    let shard_clone = shard.clone();
+    shard
+        .task_registry
+        .periodic("clean_messages")
+        .every(period)
+        .tick(move |_shutdown| clean_expired_messages(shard_clone.clone()))
+        .spawn();
+}
+
+async fn clean_expired_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> 
{
+    trace!("Cleaning expired messages...");
+
+    let namespaces = shard.get_current_shard_namespaces();
+    let now = IggyTimestamp::now();
+    let delete_oldest_segments = 
shard.config.system.topic.delete_oldest_segments;
+
+    let mut topics: std::collections::HashMap<(usize, usize), Vec<usize>> =
+        std::collections::HashMap::new();
+
+    for ns in namespaces {
+        let stream_id = ns.stream_id();
+        let topic_id = ns.topic_id();
+        topics
+            .entry((stream_id, topic_id))
+            .or_default()
+            .push(ns.partition_id());
+    }
+
+    let mut total_deleted_segments = 0u64;
+    let mut total_deleted_messages = 0u64;
+
+    for ((stream_id, topic_id), partition_ids) in topics {
+        let stream_identifier = Identifier::numeric(stream_id as u32).unwrap();
+        let topic_identifier = Identifier::numeric(topic_id as u32).unwrap();
+
+        let mut topic_deleted_segments = 0u64;
+        let mut topic_deleted_messages = 0u64;
+
+        for partition_id in partition_ids {
+            // Handle expired segments
+            let expired_result = handle_expired_segments(
+                &shard,
+                &stream_identifier,
+                &topic_identifier,
+                partition_id,
+                now,
+            )
+            .await;
+
+            match expired_result {
+                Ok(deleted) => {
+                    topic_deleted_segments += deleted.segments_count;
+                    topic_deleted_messages += deleted.messages_count;
+                }
+                Err(err) => {
+                    error!(
+                        "Failed to clean expired segments for stream ID: {}, 
topic ID: {}, partition ID: {}. Error: {}",
+                        stream_id, topic_id, partition_id, err
+                    );
+                }
+            }
+
+            // Handle oldest segments if topic size management is enabled
+            if delete_oldest_segments {
+                let oldest_result = handle_oldest_segments(
+                    &shard,
+                    &stream_identifier,
+                    &topic_identifier,
+                    partition_id,
+                )
+                .await;
+
+                match oldest_result {
+                    Ok(deleted) => {
+                        topic_deleted_segments += deleted.segments_count;
+                        topic_deleted_messages += deleted.messages_count;
+                    }
+                    Err(err) => {
+                        error!(
+                            "Failed to clean oldest segments for stream ID: 
{}, topic ID: {}, partition ID: {}. Error: {}",
+                            stream_id, topic_id, partition_id, err
+                        );
+                    }
+                }
+            }
+        }
+
+        if topic_deleted_segments > 0 {
+            info!(
+                "Deleted {} segments and {} messages for stream ID: {}, topic 
ID: {}",
+                topic_deleted_segments, topic_deleted_messages, stream_id, 
topic_id
+            );
+            total_deleted_segments += topic_deleted_segments;
+            total_deleted_messages += topic_deleted_messages;
+
+            // Update metrics
+            shard
+                .metrics
+                .decrement_segments(topic_deleted_segments as u32);
+            shard.metrics.decrement_messages(topic_deleted_messages);
+        } else {
+            trace!(
+                "No segments were deleted for stream ID: {}, topic ID: {}",
+                stream_id, topic_id
+            );
+        }
+    }
+
+    if total_deleted_segments > 0 {
+        info!(
+            "Total cleaned: {} segments and {} messages",
+            total_deleted_segments, total_deleted_messages
+        );
+    }
+
+    Ok(())
+}
+
+#[derive(Debug, Default)]
+struct DeletedSegments {
+    pub segments_count: u64,
+    pub messages_count: u64,
+}
+
+async fn handle_expired_segments(
+    shard: &Rc<IggyShard>,
+    stream_id: &Identifier,
+    topic_id: &Identifier,
+    partition_id: usize,
+    now: IggyTimestamp,
+) -> Result<DeletedSegments, IggyError> {
+    // Get expired segments
+    let expired_segment_offsets =
+        shard
+            .streams
+            .with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                let mut expired = Vec::new();
+                for segment in log.segments() {
+                    if segment.is_expired(now) {
+                        expired.push(segment.start_offset);
+                    }
+                }
+                expired
+            });
+
+    if expired_segment_offsets.is_empty() {
+        return Ok(DeletedSegments::default());
+    }
+
+    debug!(
+        "Found {} expired segments for stream ID: {}, topic ID: {}, partition 
ID: {}",
+        expired_segment_offsets.len(),
+        stream_id,
+        topic_id,
+        partition_id
+    );
+
+    delete_segments(
+        shard,
+        stream_id,
+        topic_id,
+        partition_id,
+        &expired_segment_offsets,
+    )
+    .await
+}
+
+async fn handle_oldest_segments(
+    shard: &Rc<IggyShard>,
+    stream_id: &Identifier,
+    topic_id: &Identifier,
+    partition_id: usize,
+) -> Result<DeletedSegments, IggyError> {
+    let topic_info =
+        shard
+            .streams
+            .with_topic_by_id(stream_id, topic_id, 
topics_helpers::get_topic_size_info());
+
+    let (is_unlimited, is_almost_full) = topic_info;
+
+    if is_unlimited {
+        debug!(
+            "Topic is unlimited, oldest segments will not be deleted for 
stream ID: {}, topic ID: {}, partition ID: {}",
+            stream_id, topic_id, partition_id
+        );
+        return Ok(DeletedSegments::default());
+    }
+
+    if !is_almost_full {
+        debug!(
+            "Topic is not almost full, oldest segments will not be deleted for 
stream ID: {}, topic ID: {}, partition ID: {}",
+            stream_id, topic_id, partition_id
+        );
+        return Ok(DeletedSegments::default());
+    }
+
+    let oldest_segment_offset =
+        shard
+            .streams
+            .with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                let segments = log.segments();
+                // Find the first closed segment (not the active one)
+                if segments.len() > 1 {
+                    // The last segment is always active, so we look at 
earlier ones
+                    segments.first().map(|s| s.start_offset)
+                } else {
+                    None
+                }
+            });
+
+    if let Some(start_offset) = oldest_segment_offset {
+        info!(
+            "Deleting oldest segment with start offset {} for stream ID: {}, 
topic ID: {}, partition ID: {}",
+            start_offset, stream_id, topic_id, partition_id
+        );
+
+        delete_segments(shard, stream_id, topic_id, partition_id, 
&[start_offset]).await
+    } else {
+        debug!(
+            "No closed segments found to delete for stream ID: {}, topic ID: 
{}, partition ID: {}",
+            stream_id, topic_id, partition_id
+        );
+        Ok(DeletedSegments::default())
+    }
+}
+
+async fn delete_segments(
+    shard: &Rc<IggyShard>,
+    stream_id: &Identifier,
+    topic_id: &Identifier,
+    partition_id: usize,
+    segment_offsets: &[u64],
+) -> Result<DeletedSegments, IggyError> {
+    if segment_offsets.is_empty() {
+        return Ok(DeletedSegments::default());
+    }
+
+    info!(
+        "Deleting {} segments for stream ID: {}, topic ID: {}, partition ID: 
{}...",
+        segment_offsets.len(),
+        stream_id,
+        topic_id,
+        partition_id
+    );
+
+    let mut segments_count = 0u64;
+    let mut messages_count = 0u64;
+
+    let to_delete = shard.streams.with_partition_by_id_mut(
+        stream_id,
+        topic_id,
+        partition_id,
+        |(_, stats, .., log)| {
+            let mut segments_to_remove = Vec::new();
+            let mut storages_to_remove = Vec::new();
+
+            let mut indices_to_remove: Vec<usize> = Vec::new();
+            for &start_offset in segment_offsets {
+                if let Some(idx) = log
+                    .segments()
+                    .iter()
+                    .position(|s| s.start_offset == start_offset)
+                {
+                    indices_to_remove.push(idx);
+                }
+            }
+
+            indices_to_remove.sort_by(|a, b| b.cmp(a));
+            for idx in indices_to_remove {
+                let segment = log.segments_mut().remove(idx);
+                let storage = log.storages_mut().remove(idx);
+                log.indexes_mut().remove(idx);
+
+                segments_to_remove.push(segment);
+                storages_to_remove.push(storage);
+            }
+
+            (stats.clone(), segments_to_remove, storages_to_remove)
+        },
+    );
+
+    let (stats, segments_to_delete, mut storages_to_delete) = to_delete;
+
+    for (segment, storage) in segments_to_delete
+        .into_iter()
+        .zip(storages_to_delete.iter_mut())
+    {
+        let segment_size = segment.size.as_bytes_u64();
+        let start_offset = segment.start_offset;
+        let end_offset = segment.end_offset;
+
+        let approx_messages = if (end_offset - start_offset) == 0 {
+            0
+        } else {
+            (end_offset - start_offset) + 1
+        };
+
+        let _ = storage.shutdown();
+        let (messages_path, index_path) = storage.segment_and_index_paths();
+
+        if let Some(path) = messages_path {
+            if let Err(e) = compio::fs::remove_file(&path).await {
+                error!("Failed to delete messages file {}: {}", path, e);
+            } else {
+                trace!("Deleted messages file: {}", path);
+            }
+        } else {
+            warn!(
+                "Messages writer path not found for segment starting at offset 
{}",
+                start_offset
+            );
+        }
+
+        if let Some(path) = index_path {
+            if let Err(e) = compio::fs::remove_file(&path).await {
+                error!("Failed to delete index file {}: {}", path, e);
+            } else {
+                trace!("Deleted index file: {}", path);
+            }
+
+            let time_index_path = path.replace(".index", ".timeindex");
+            if let Err(e) = compio::fs::remove_file(&time_index_path).await {
+                trace!(
+                    "Could not delete time index file {}: {}",
+                    time_index_path, e
+                );
+            }
+        } else {
+            warn!(
+                "Index writer path not found for segment starting at offset 
{}",
+                start_offset
+            );
+        }
+
+        stats.decrement_size_bytes(segment_size);
+        stats.decrement_segments_count(1);
+        stats.decrement_messages_count(messages_count);
+
+        info!(
+            "Deleted segment with start offset {} (end: {}, size: {}, 
messages: {}) from partition ID: {}",
+            start_offset, end_offset, segment_size, approx_messages, 
partition_id
+        );
+
+        segments_count += 1;
+        messages_count += approx_messages;
+    }
+
+    Ok(DeletedSegments {
+        segments_count,
+        messages_count,
+    })
+}
diff --git a/core/server/src/shard/tasks/periodic/mod.rs 
b/core/server/src/shard/tasks/periodic/mod.rs
index 9689060ae..d0694ffb7 100644
--- a/core/server/src/shard/tasks/periodic/mod.rs
+++ b/core/server/src/shard/tasks/periodic/mod.rs
@@ -18,12 +18,14 @@
 
 mod heartbeat_verifier;
 mod jwt_token_cleaner;
+mod message_cleaner;
 mod message_saver;
 mod personal_access_token_cleaner;
 mod sysinfo_printer;
 
 pub use heartbeat_verifier::spawn_heartbeat_verifier;
 pub use jwt_token_cleaner::spawn_jwt_token_cleaner;
+pub use message_cleaner::spawn_message_cleaner;
 pub use message_saver::spawn_message_saver;
 pub use personal_access_token_cleaner::spawn_personal_access_token_cleaner;
 pub use sysinfo_printer::spawn_sysinfo_printer;
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index 4d978e00d..3109d5f24 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -151,7 +151,6 @@ impl FileState {
                 .await
                 .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} index. 
{error}"))
                 .map_err(|_| IggyError::InvalidNumberEncoding)?;
-            tracing::warn!("index: {}", index);
             total_size += 8;
             // Greater than one, because one of the entries after a fresh 
reboot is the default root user.
             if entries_count > 1 && index != current_index + 1 {
@@ -264,7 +263,6 @@ impl FileState {
             EntryCommand::from_bytes(command.clone()).with_error(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to parse entry 
command from bytes")
             })?;
-            tracing::warn!("command: {:?}", command);
             let calculated_checksum = StateEntry::calculate_checksum(
                 index, term, leader_id, version, flags, timestamp, user_id, 
&context, &command,
             );
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs 
b/core/server/src/streaming/segments/indexes/index_reader.rs
index f37862beb..d58035e18 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -65,6 +65,10 @@ impl IndexReader {
         })
     }
 
+    pub fn path(&self) -> String {
+        self.file_path.clone()
+    }
+
     /// Loads all indexes from the index file into the optimized binary format.
     /// Note that this function does not use the pool, as the messages are not 
cached.
     /// This is expected - this method is called at startup and we want to 
preserve
diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs 
b/core/server/src/streaming/segments/messages/messages_reader.rs
index 963510a60..3a419947b 100644
--- a/core/server/src/streaming/segments/messages/messages_reader.rs
+++ b/core/server/src/streaming/segments/messages/messages_reader.rs
@@ -84,6 +84,10 @@ impl MessagesReader {
         })
     }
 
+    pub fn path(&self) -> String {
+        self.file_path.clone()
+    }
+
     /// Loads and returns all message IDs from the messages file.
     /// Note that this function does not use the pool, as the messages are not 
cached.
     /// This is expected - this method is called at startup and we want to 
preserve
diff --git a/core/server/src/streaming/segments/storage.rs 
b/core/server/src/streaming/segments/storage.rs
index b21936650..9b04d10bb 100644
--- a/core/server/src/streaming/segments/storage.rs
+++ b/core/server/src/streaming/segments/storage.rs
@@ -75,6 +75,12 @@ impl Storage {
         let index_writer = self.index_writer.take();
         (messages_writer, index_writer)
     }
+
+    pub fn segment_and_index_paths(&self) -> (Option<String>, Option<String>) {
+        let index_path = self.index_reader.as_ref().map(|reader| 
reader.path());
+        let segment_path = self.messages_reader.as_ref().map(|reader| 
reader.path());
+        (segment_path, index_path)
+    }
 }
 
 /// Creates a new storage for the specified partition with the given start 
offset
diff --git a/core/server/src/streaming/topics/helpers.rs 
b/core/server/src/streaming/topics/helpers.rs
index 433d03ac0..1e0707d1b 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -80,6 +80,30 @@ pub fn get_max_topic_size() -> impl 
FnOnce(ComponentsById<TopicRef>) -> MaxTopic
     |(root, _, _)| root.max_topic_size()
 }
 
+pub fn get_topic_size_info() -> impl FnOnce(ComponentsById<TopicRef>) -> 
(bool, bool) {
+    |(root, _, stats)| {
+        let max_size = root.max_topic_size();
+        let current_size = stats.size_bytes_inconsistent();
+        let is_unlimited = matches!(max_size, MaxTopicSize::Unlimited);
+        let is_almost_full = if !is_unlimited {
+            let max_bytes = max_size.as_bytes_u64();
+            // Consider "almost full" as 90% capacity
+            current_size >= (max_bytes * 9 / 10)
+        } else {
+            false
+        };
+        tracing::warn!(
+            "Topic ID: {}, current_size: {}, max_size: {:?}, is_unlimited: {}, 
is_almost_full: {}",
+            root.id(),
+            current_size,
+            max_size,
+            is_unlimited,
+            is_almost_full
+        );
+        (is_unlimited, is_almost_full)
+    }
+}
+
 pub fn calculate_partition_id_by_messages_key_hash(
     upperbound: usize,
     messages_key: &[u8],

Reply via email to