This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch fix_segment_leak in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 2f67133a5143a4d3026f6516c3d030875cec2e33 Author: spetz <[email protected]> AuthorDate: Thu Feb 5 14:32:21 2026 +0100 fix(server): memory leak in segment rotation, clear indexes and close writers --- Cargo.lock | 1 + core/integration/Cargo.toml | 1 + .../scenarios/segment_rotation_race_scenario.rs | 25 ++++++++++++++++++++++ core/server/src/shard/system/segments.rs | 20 +++++++++++++++-- 4 files changed, 45 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8a4e57bd..70ad39614 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5011,6 +5011,7 @@ dependencies = [ "socket2 0.6.2", "sqlx", "strum", + "sysinfo 0.38.0", "tempfile", "test-case", "testcontainers-modules", diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index fa57fd1be..e3c05ba82 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -67,6 +67,7 @@ server = { workspace = true } socket2 = { workspace = true } sqlx = { workspace = true } strum = { workspace = true } +sysinfo = { workspace = true } tempfile = { workspace = true } test-case = { workspace = true } testcontainers-modules = { workspace = true } diff --git a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs index de6274402..2630262ee 100644 --- a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs +++ b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs @@ -33,6 +33,7 @@ use integration::test_server::{ClientFactory, login_root}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::time::Duration; +use sysinfo::{Pid, ProcessesToUpdate, System}; use tokio::task::JoinSet; const STREAM_NAME: &str = "race-test-stream"; @@ -41,6 +42,7 @@ const PRODUCERS_PER_PROTOCOL: usize = 2; const PARTITION_ID: u32 = 0; const TEST_DURATION_SECS: u64 = 10; const MESSAGES_PER_BATCH: usize = 5; +const MAX_ALLOWED_MEMORY_BYTES: u64 = 200 * 1024 * 1024; /// Runs the segment rotation race condition test with multiple protocols. /// Each client factory represents a different protocol (TCP, HTTP, QUIC, WebSocket). @@ -54,6 +56,9 @@ pub async fn run(client_factories: &[&dyn ClientFactory]) { let admin_client = create_client(client_factories[0]).await; login_root(&admin_client).await; + let stats = admin_client.get_stats().await.unwrap(); + let server_pid = stats.process_id; + let total_producers = client_factories.len() * PRODUCERS_PER_PROTOCOL; init_system(&admin_client, total_producers).await; @@ -111,6 +116,18 @@ pub async fn run(client_factories: &[&dyn ClientFactory]) { let sent = total_messages.load(Ordering::SeqCst); println!("Test completed successfully. Total messages sent: {}", sent); + let final_memory = get_process_memory(server_pid); + println!( + "Final server memory: {:.2} MB", + final_memory as f64 / 1024.0 / 1024.0 + ); + assert!( + final_memory < MAX_ALLOWED_MEMORY_BYTES, + "Memory leak detected! Server using {:.2} MB, max allowed is {:.2} MB", + final_memory as f64 / 1024.0 / 1024.0, + MAX_ALLOWED_MEMORY_BYTES as f64 / 1024.0 / 1024.0 + ); + cleanup(&admin_client).await; } @@ -194,3 +211,11 @@ async fn cleanup(client: &IggyClient) { .await .unwrap(); } + +fn get_process_memory(pid: u32) -> u64 { + let mut sys = System::new(); + sys.refresh_processes(ProcessesToUpdate::Some(&[Pid::from_u32(pid)]), true); + sys.process(Pid::from_u32(pid)) + .map(|p| p.memory()) + .unwrap_or(0) +} diff --git a/core/server/src/shard/system/segments.rs b/core/server/src/shard/system/segments.rs index 38e91fd2d..b805f3da1 100644 --- a/core/server/src/shard/system/segments.rs +++ b/core/server/src/shard/system/segments.rs @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ +use crate::configs::cache_indexes::CacheIndexesConfig; use crate::shard::IggyShard; use crate::streaming::segments::Segment; use iggy_common::IggyError; @@ -162,14 +163,15 @@ impl IggyShard { ) -> Result<(), IggyError> { use crate::streaming::segments::storage::create_segment_storage; - let start_offset = { + let (start_offset, old_segment_index) = { let mut partitions = self.local_partitions.borrow_mut(); let partition = partitions .get_mut(namespace) .expect("rotate_segment: partition must exist"); + let old_segment_index = partition.log.segments().len() - 1; let active_segment = partition.log.active_segment_mut(); active_segment.sealed = true; - active_segment.end_offset + 1 + (active_segment.end_offset + 1, old_segment_index) }; let segment = Segment::new(start_offset, self.config.system.segment.size); @@ -187,6 +189,20 @@ impl IggyShard { let mut partitions = self.local_partitions.borrow_mut(); if let Some(partition) = partitions.get_mut(namespace) { + // Clear old segment's indexes if cache_indexes is not set to All. + // This prevents memory accumulation from keeping index buffers for sealed segments. + if !matches!( + self.config.system.segment.cache_indexes, + CacheIndexesConfig::All + ) { + partition.log.indexes_mut()[old_segment_index] = None; + } + + // Close writers for the sealed segment - they're never needed after sealing. + // This releases file handles and associated kernel/io_uring resources. + let old_storage = &mut partition.log.storages_mut()[old_segment_index]; + let _ = old_storage.shutdown(); + partition.log.add_persisted_segment(segment, storage); partition.stats.increment_segments_count(1); tracing::info!(
