This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_final in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 6454be16aff108f969edb89729c7641da1057ebb Author: numminex <[email protected]> AuthorDate: Thu Nov 6 17:00:17 2025 +0100 bring back the message cleaner task --- core/configs/server.toml | 45 +-- core/server/src/configs/defaults.rs | 1 - core/server/src/configs/displays.rs | 32 +- core/server/src/configs/server.rs | 16 - core/server/src/configs/validators.rs | 2 +- core/server/src/shard/mod.rs | 4 + .../src/shard/tasks/periodic/message_cleaner.rs | 392 +++++++++++++++++++++ core/server/src/shard/tasks/periodic/mod.rs | 2 + core/server/src/state/file.rs | 2 - .../src/streaming/segments/indexes/index_reader.rs | 4 + .../streaming/segments/messages/messages_reader.rs | 4 + core/server/src/streaming/segments/storage.rs | 6 + core/server/src/streaming/topics/helpers.rs | 24 ++ 13 files changed, 444 insertions(+), 90 deletions(-) diff --git a/core/configs/server.toml b/core/configs/server.toml index ea1be6c66..ef7fa96be 100644 --- a/core/configs/server.toml +++ b/core/configs/server.toml @@ -15,45 +15,12 @@ # specific language governing permissions and limitations # under the License. -[data_maintenance.archiver] -# Enables or disables the archiver process. -enabled = false - -# Kind of archiver to use. Available options: "disk". -kind = "disk" - -[data_maintenance.archiver.disk] -# Path for storing the archived data on disk. -path = "local_data/archive" - -[data_maintenance.archiver.s3] -# Access key ID for the S3 bucket. -key_id = "123" - -# Secret access key for the S3 bucket -key_secret = "secret" - -# Name of the S3 bucket. -bucket = "iggy" - -# Endpoint of the S3 region. -endpoint = "http://localhost:9000" - -# Region of the S3 bucket. -region = "eu-west-1" - -# Temporary directory for storing the data before uploading to S3. -tmp_upload_dir = "local_data/s3_tmp" - [data_maintenance.messages] -# Enables or disables the archiver process for closed segments containing messages. -archiver_enabled = false - # Enables or disables the expired message cleaner process. cleaner_enabled = false # Interval for running the message archiver and cleaner. -interval = "35s" +interval = "1 m" [data_maintenance.state] # Enables or disables the archiver process for state log. @@ -285,16 +252,6 @@ send_buffer_size = "64 KB" # SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection keepalive = false -# Message cleaner configuration. -[message_cleaner] -# Enables or disables the background process for deleting expired messages. -# `true` activates the message cleaner. -# `false` turns it off, messages will not be auto-deleted based on expiry. -enabled = true - -# Interval for running the message cleaner. -interval = "1 m" - # Message saver configuration. [message_saver] # Enables or disables the background process for saving buffered data to disk. diff --git a/core/server/src/configs/defaults.rs b/core/server/src/configs/defaults.rs index 7aed1e606..2b66dc7f3 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@ -67,7 +67,6 @@ impl Default for ServerConfig { impl Default for MessagesMaintenanceConfig { fn default() -> MessagesMaintenanceConfig { MessagesMaintenanceConfig { - archiver_enabled: SERVER_CONFIG.data_maintenance.messages.archiver_enabled, cleaner_enabled: SERVER_CONFIG.data_maintenance.messages.cleaner_enabled, interval: SERVER_CONFIG .data_maintenance diff --git a/core/server/src/configs/displays.rs b/core/server/src/configs/displays.rs index fe66dbedd..c70aeffb5 100644 --- a/core/server/src/configs/displays.rs +++ b/core/server/src/configs/displays.rs @@ -18,9 +18,8 @@ use crate::configs::quic::{QuicCertificateConfig, QuicConfig}; use crate::configs::server::{ - DataMaintenanceConfig, DiskArchiverConfig, HeartbeatConfig, MessagesMaintenanceConfig, - S3ArchiverConfig, StateMaintenanceConfig, TelemetryConfig, TelemetryLogsConfig, - TelemetryTracesConfig, + DataMaintenanceConfig, HeartbeatConfig, MessagesMaintenanceConfig, StateMaintenanceConfig, + TelemetryConfig, TelemetryLogsConfig, TelemetryTracesConfig, }; use crate::configs::system::MessageDeduplicationConfig; use crate::configs::{ @@ -145,31 +144,12 @@ impl Display for DataMaintenanceConfig { } } -impl Display for DiskArchiverConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{{ path: {} }}", self.path) - } -} - -impl Display for S3ArchiverConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{{ key_id: {}, bucket: {}, endpoint: {}. region: {} }}", - self.key_id, - self.bucket, - self.endpoint.as_deref().unwrap_or_default(), - self.region.as_deref().unwrap_or_default() - ) - } -} - impl Display for MessagesMaintenanceConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ archiver_enabled: {}, cleaner_enabled: {}, interval: {} }}", - self.archiver_enabled, self.cleaner_enabled, self.interval + "{{ cleaner_enabled: {}, interval: {} }}", + self.cleaner_enabled, self.interval ) } } @@ -178,8 +158,8 @@ impl Display for StateMaintenanceConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ archiver_enabled: {}, overwrite: {}, interval: {} }}", - self.archiver_enabled, self.overwrite, self.interval + "{{ overwrite: {}, interval: {} }}", + self.overwrite, self.interval ) } } diff --git a/core/server/src/configs/server.rs b/core/server/src/configs/server.rs index 26b6ef033..18c095ebf 100644 --- a/core/server/src/configs/server.rs +++ b/core/server/src/configs/server.rs @@ -60,7 +60,6 @@ pub struct DataMaintenanceConfig { #[serde_as] #[derive(Debug, Deserialize, Serialize, Clone)] pub struct MessagesMaintenanceConfig { - pub archiver_enabled: bool, pub cleaner_enabled: bool, #[serde_as(as = "DisplayFromStr")] pub interval: IggyDuration, @@ -75,21 +74,6 @@ pub struct StateMaintenanceConfig { pub interval: IggyDuration, } -#[derive(Debug, Deserialize, Serialize, Clone)] -pub struct DiskArchiverConfig { - pub path: String, -} - -#[derive(Debug, Deserialize, Serialize, Clone)] -pub struct S3ArchiverConfig { - pub key_id: String, - pub key_secret: String, - pub bucket: String, - pub endpoint: Option<String>, - pub region: Option<String>, - pub tmp_upload_dir: String, -} - #[serde_as] #[derive(Debug, Deserialize, Serialize, Clone)] pub struct MessageSaverConfig { diff --git a/core/server/src/configs/validators.rs b/core/server/src/configs/validators.rs index 9f1e4eea7..2d4d05f15 100644 --- a/core/server/src/configs/validators.rs +++ b/core/server/src/configs/validators.rs @@ -212,7 +212,7 @@ impl Validatable<ConfigError> for DataMaintenanceConfig { impl Validatable<ConfigError> for MessagesMaintenanceConfig { fn validate(&self) -> Result<(), ConfigError> { - if self.archiver_enabled && self.interval.is_zero() { + if self.cleaner_enabled && self.interval.is_zero() { return Err(ConfigError::InvalidConfiguration); } diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 05287c037..9d272c417 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -146,6 +146,10 @@ impl IggyShard { periodic::spawn_message_saver(self.clone()); } + if self.config.data_maintenance.messages.cleaner_enabled { + periodic::spawn_message_cleaner(self.clone()); + } + if self.config.heartbeat.enabled { periodic::spawn_heartbeat_verifier(self.clone()); } diff --git a/core/server/src/shard/tasks/periodic/message_cleaner.rs b/core/server/src/shard/tasks/periodic/message_cleaner.rs new file mode 100644 index 000000000..54f6f0f10 --- /dev/null +++ b/core/server/src/shard/tasks/periodic/message_cleaner.rs @@ -0,0 +1,392 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::shard::IggyShard; +use crate::streaming::topics::helpers as topics_helpers; +use iggy_common::{Identifier, IggyError, IggyTimestamp}; +use std::rc::Rc; +use tracing::{debug, error, info, trace, warn}; + +pub fn spawn_message_cleaner(shard: Rc<IggyShard>) { + if !shard.config.data_maintenance.messages.cleaner_enabled { + info!("Message cleaner is disabled."); + return; + } + + let period = shard + .config + .data_maintenance + .messages + .interval + .get_duration(); + info!( + "Message cleaner is enabled, expired segments will be automatically deleted every: {:?}", + period + ); + let shard_clone = shard.clone(); + shard + .task_registry + .periodic("clean_messages") + .every(period) + .tick(move |_shutdown| clean_expired_messages(shard_clone.clone())) + .spawn(); +} + +async fn clean_expired_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> { + trace!("Cleaning expired messages..."); + + let namespaces = shard.get_current_shard_namespaces(); + let now = IggyTimestamp::now(); + let delete_oldest_segments = shard.config.system.topic.delete_oldest_segments; + + let mut topics: std::collections::HashMap<(usize, usize), Vec<usize>> = + std::collections::HashMap::new(); + + for ns in namespaces { + let stream_id = ns.stream_id(); + let topic_id = ns.topic_id(); + topics + .entry((stream_id, topic_id)) + .or_default() + .push(ns.partition_id()); + } + + let mut total_deleted_segments = 0u64; + let mut total_deleted_messages = 0u64; + + for ((stream_id, topic_id), partition_ids) in topics { + let stream_identifier = Identifier::numeric(stream_id as u32).unwrap(); + let topic_identifier = Identifier::numeric(topic_id as u32).unwrap(); + + let mut topic_deleted_segments = 0u64; + let mut topic_deleted_messages = 0u64; + + for partition_id in partition_ids { + // Handle expired segments + let expired_result = handle_expired_segments( + &shard, + &stream_identifier, + &topic_identifier, + partition_id, + now, + ) + .await; + + match expired_result { + Ok(deleted) => { + topic_deleted_segments += deleted.segments_count; + topic_deleted_messages += deleted.messages_count; + } + Err(err) => { + error!( + "Failed to clean expired segments for stream ID: {}, topic ID: {}, partition ID: {}. Error: {}", + stream_id, topic_id, partition_id, err + ); + } + } + + // Handle oldest segments if topic size management is enabled + if delete_oldest_segments { + let oldest_result = handle_oldest_segments( + &shard, + &stream_identifier, + &topic_identifier, + partition_id, + ) + .await; + + match oldest_result { + Ok(deleted) => { + topic_deleted_segments += deleted.segments_count; + topic_deleted_messages += deleted.messages_count; + } + Err(err) => { + error!( + "Failed to clean oldest segments for stream ID: {}, topic ID: {}, partition ID: {}. Error: {}", + stream_id, topic_id, partition_id, err + ); + } + } + } + } + + if topic_deleted_segments > 0 { + info!( + "Deleted {} segments and {} messages for stream ID: {}, topic ID: {}", + topic_deleted_segments, topic_deleted_messages, stream_id, topic_id + ); + total_deleted_segments += topic_deleted_segments; + total_deleted_messages += topic_deleted_messages; + + // Update metrics + shard + .metrics + .decrement_segments(topic_deleted_segments as u32); + shard.metrics.decrement_messages(topic_deleted_messages); + } else { + trace!( + "No segments were deleted for stream ID: {}, topic ID: {}", + stream_id, topic_id + ); + } + } + + if total_deleted_segments > 0 { + info!( + "Total cleaned: {} segments and {} messages", + total_deleted_segments, total_deleted_messages + ); + } + + Ok(()) +} + +#[derive(Debug, Default)] +struct DeletedSegments { + pub segments_count: u64, + pub messages_count: u64, +} + +async fn handle_expired_segments( + shard: &Rc<IggyShard>, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: usize, + now: IggyTimestamp, +) -> Result<DeletedSegments, IggyError> { + // Get expired segments + let expired_segment_offsets = + shard + .streams + .with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + let mut expired = Vec::new(); + for segment in log.segments() { + if segment.is_expired(now) { + expired.push(segment.start_offset); + } + } + expired + }); + + if expired_segment_offsets.is_empty() { + return Ok(DeletedSegments::default()); + } + + debug!( + "Found {} expired segments for stream ID: {}, topic ID: {}, partition ID: {}", + expired_segment_offsets.len(), + stream_id, + topic_id, + partition_id + ); + + delete_segments( + shard, + stream_id, + topic_id, + partition_id, + &expired_segment_offsets, + ) + .await +} + +async fn handle_oldest_segments( + shard: &Rc<IggyShard>, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: usize, +) -> Result<DeletedSegments, IggyError> { + let topic_info = + shard + .streams + .with_topic_by_id(stream_id, topic_id, topics_helpers::get_topic_size_info()); + + let (is_unlimited, is_almost_full) = topic_info; + + if is_unlimited { + debug!( + "Topic is unlimited, oldest segments will not be deleted for stream ID: {}, topic ID: {}, partition ID: {}", + stream_id, topic_id, partition_id + ); + return Ok(DeletedSegments::default()); + } + + if !is_almost_full { + debug!( + "Topic is not almost full, oldest segments will not be deleted for stream ID: {}, topic ID: {}, partition ID: {}", + stream_id, topic_id, partition_id + ); + return Ok(DeletedSegments::default()); + } + + let oldest_segment_offset = + shard + .streams + .with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + let segments = log.segments(); + // Find the first closed segment (not the active one) + if segments.len() > 1 { + // The last segment is always active, so we look at earlier ones + segments.first().map(|s| s.start_offset) + } else { + None + } + }); + + if let Some(start_offset) = oldest_segment_offset { + info!( + "Deleting oldest segment with start offset {} for stream ID: {}, topic ID: {}, partition ID: {}", + start_offset, stream_id, topic_id, partition_id + ); + + delete_segments(shard, stream_id, topic_id, partition_id, &[start_offset]).await + } else { + debug!( + "No closed segments found to delete for stream ID: {}, topic ID: {}, partition ID: {}", + stream_id, topic_id, partition_id + ); + Ok(DeletedSegments::default()) + } +} + +async fn delete_segments( + shard: &Rc<IggyShard>, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: usize, + segment_offsets: &[u64], +) -> Result<DeletedSegments, IggyError> { + if segment_offsets.is_empty() { + return Ok(DeletedSegments::default()); + } + + info!( + "Deleting {} segments for stream ID: {}, topic ID: {}, partition ID: {}...", + segment_offsets.len(), + stream_id, + topic_id, + partition_id + ); + + let mut segments_count = 0u64; + let mut messages_count = 0u64; + + let to_delete = shard.streams.with_partition_by_id_mut( + stream_id, + topic_id, + partition_id, + |(_, stats, .., log)| { + let mut segments_to_remove = Vec::new(); + let mut storages_to_remove = Vec::new(); + + let mut indices_to_remove: Vec<usize> = Vec::new(); + for &start_offset in segment_offsets { + if let Some(idx) = log + .segments() + .iter() + .position(|s| s.start_offset == start_offset) + { + indices_to_remove.push(idx); + } + } + + indices_to_remove.sort_by(|a, b| b.cmp(a)); + for idx in indices_to_remove { + let segment = log.segments_mut().remove(idx); + let storage = log.storages_mut().remove(idx); + log.indexes_mut().remove(idx); + + segments_to_remove.push(segment); + storages_to_remove.push(storage); + } + + (stats.clone(), segments_to_remove, storages_to_remove) + }, + ); + + let (stats, segments_to_delete, mut storages_to_delete) = to_delete; + + for (segment, storage) in segments_to_delete + .into_iter() + .zip(storages_to_delete.iter_mut()) + { + let segment_size = segment.size.as_bytes_u64(); + let start_offset = segment.start_offset; + let end_offset = segment.end_offset; + + let approx_messages = if (end_offset - start_offset) == 0 { + 0 + } else { + (end_offset - start_offset) + 1 + }; + + let _ = storage.shutdown(); + let (messages_path, index_path) = storage.segment_and_index_paths(); + + if let Some(path) = messages_path { + if let Err(e) = compio::fs::remove_file(&path).await { + error!("Failed to delete messages file {}: {}", path, e); + } else { + trace!("Deleted messages file: {}", path); + } + } else { + warn!( + "Messages writer path not found for segment starting at offset {}", + start_offset + ); + } + + if let Some(path) = index_path { + if let Err(e) = compio::fs::remove_file(&path).await { + error!("Failed to delete index file {}: {}", path, e); + } else { + trace!("Deleted index file: {}", path); + } + + let time_index_path = path.replace(".index", ".timeindex"); + if let Err(e) = compio::fs::remove_file(&time_index_path).await { + trace!( + "Could not delete time index file {}: {}", + time_index_path, e + ); + } + } else { + warn!( + "Index writer path not found for segment starting at offset {}", + start_offset + ); + } + + stats.decrement_size_bytes(segment_size); + stats.decrement_segments_count(1); + stats.decrement_messages_count(messages_count); + + info!( + "Deleted segment with start offset {} (end: {}, size: {}, messages: {}) from partition ID: {}", + start_offset, end_offset, segment_size, approx_messages, partition_id + ); + + segments_count += 1; + messages_count += approx_messages; + } + + Ok(DeletedSegments { + segments_count, + messages_count, + }) +} diff --git a/core/server/src/shard/tasks/periodic/mod.rs b/core/server/src/shard/tasks/periodic/mod.rs index 9689060ae..d0694ffb7 100644 --- a/core/server/src/shard/tasks/periodic/mod.rs +++ b/core/server/src/shard/tasks/periodic/mod.rs @@ -18,12 +18,14 @@ mod heartbeat_verifier; mod jwt_token_cleaner; +mod message_cleaner; mod message_saver; mod personal_access_token_cleaner; mod sysinfo_printer; pub use heartbeat_verifier::spawn_heartbeat_verifier; pub use jwt_token_cleaner::spawn_jwt_token_cleaner; +pub use message_cleaner::spawn_message_cleaner; pub use message_saver::spawn_message_saver; pub use personal_access_token_cleaner::spawn_personal_access_token_cleaner; pub use sysinfo_printer::spawn_sysinfo_printer; diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs index 4d978e00d..3109d5f24 100644 --- a/core/server/src/state/file.rs +++ b/core/server/src/state/file.rs @@ -151,7 +151,6 @@ impl FileState { .await .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} index. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?; - tracing::warn!("index: {}", index); total_size += 8; // Greater than one, because one of the entries after a fresh reboot is the default root user. if entries_count > 1 && index != current_index + 1 { @@ -264,7 +263,6 @@ impl FileState { EntryCommand::from_bytes(command.clone()).with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to parse entry command from bytes") })?; - tracing::warn!("command: {:?}", command); let calculated_checksum = StateEntry::calculate_checksum( index, term, leader_id, version, flags, timestamp, user_id, &context, &command, ); diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs b/core/server/src/streaming/segments/indexes/index_reader.rs index f37862beb..d58035e18 100644 --- a/core/server/src/streaming/segments/indexes/index_reader.rs +++ b/core/server/src/streaming/segments/indexes/index_reader.rs @@ -65,6 +65,10 @@ impl IndexReader { }) } + pub fn path(&self) -> String { + self.file_path.clone() + } + /// Loads all indexes from the index file into the optimized binary format. /// Note that this function does not use the pool, as the messages are not cached. /// This is expected - this method is called at startup and we want to preserve diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs b/core/server/src/streaming/segments/messages/messages_reader.rs index 963510a60..3a419947b 100644 --- a/core/server/src/streaming/segments/messages/messages_reader.rs +++ b/core/server/src/streaming/segments/messages/messages_reader.rs @@ -84,6 +84,10 @@ impl MessagesReader { }) } + pub fn path(&self) -> String { + self.file_path.clone() + } + /// Loads and returns all message IDs from the messages file. /// Note that this function does not use the pool, as the messages are not cached. /// This is expected - this method is called at startup and we want to preserve diff --git a/core/server/src/streaming/segments/storage.rs b/core/server/src/streaming/segments/storage.rs index b21936650..9b04d10bb 100644 --- a/core/server/src/streaming/segments/storage.rs +++ b/core/server/src/streaming/segments/storage.rs @@ -75,6 +75,12 @@ impl Storage { let index_writer = self.index_writer.take(); (messages_writer, index_writer) } + + pub fn segment_and_index_paths(&self) -> (Option<String>, Option<String>) { + let index_path = self.index_reader.as_ref().map(|reader| reader.path()); + let segment_path = self.messages_reader.as_ref().map(|reader| reader.path()); + (segment_path, index_path) + } } /// Creates a new storage for the specified partition with the given start offset diff --git a/core/server/src/streaming/topics/helpers.rs b/core/server/src/streaming/topics/helpers.rs index 433d03ac0..1e0707d1b 100644 --- a/core/server/src/streaming/topics/helpers.rs +++ b/core/server/src/streaming/topics/helpers.rs @@ -80,6 +80,30 @@ pub fn get_max_topic_size() -> impl FnOnce(ComponentsById<TopicRef>) -> MaxTopic |(root, _, _)| root.max_topic_size() } +pub fn get_topic_size_info() -> impl FnOnce(ComponentsById<TopicRef>) -> (bool, bool) { + |(root, _, stats)| { + let max_size = root.max_topic_size(); + let current_size = stats.size_bytes_inconsistent(); + let is_unlimited = matches!(max_size, MaxTopicSize::Unlimited); + let is_almost_full = if !is_unlimited { + let max_bytes = max_size.as_bytes_u64(); + // Consider "almost full" as 90% capacity + current_size >= (max_bytes * 9 / 10) + } else { + false + }; + tracing::warn!( + "Topic ID: {}, current_size: {}, max_size: {:?}, is_unlimited: {}, is_almost_full: {}", + root.id(), + current_size, + max_size, + is_unlimited, + is_almost_full + ); + (is_unlimited, is_almost_full) + } +} + pub fn calculate_partition_id_by_messages_key_hash( upperbound: usize, messages_key: &[u8],
