This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new d98176a36 fix(server): memory leak in segment rotation (#2686)
d98176a36 is described below
commit d98176a36a565ec3a47a6a5e52869a2dd6cc36c4
Author: Piotr Gankiewicz <[email protected]>
AuthorDate: Thu Feb 5 15:12:00 2026 +0100
fix(server): memory leak in segment rotation (#2686)
Segment rotation accumulated memory indefinitely because sealed segments
retained their 16MB index buffers and kept file writers open. Under
heavy write load with frequent rotations, this caused memory to balloon
from ~100MB to 20GB+.
The fix clears index buffers for sealed segments (when cache_indexes !=
All) and closes their writers immediately after sealing. Writers are
never needed post-seal, and this also releases io_uring/kernel file
handle resources.
---
Cargo.lock | 1 +
core/integration/Cargo.toml | 1 +
.../scenarios/segment_rotation_race_scenario.rs | 25 ++++++++++++++++++++++
core/server/src/shard/system/segments.rs | 20 +++++++++++++++--
4 files changed, 45 insertions(+), 2 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index a8a4e57bd..70ad39614 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5011,6 +5011,7 @@ dependencies = [
"socket2 0.6.2",
"sqlx",
"strum",
+ "sysinfo 0.38.0",
"tempfile",
"test-case",
"testcontainers-modules",
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index fa57fd1be..e3c05ba82 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -67,6 +67,7 @@ server = { workspace = true }
socket2 = { workspace = true }
sqlx = { workspace = true }
strum = { workspace = true }
+sysinfo = { workspace = true }
tempfile = { workspace = true }
test-case = { workspace = true }
testcontainers-modules = { workspace = true }
diff --git
a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
index de6274402..2630262ee 100644
--- a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
+++ b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
@@ -33,6 +33,7 @@ use integration::test_server::{ClientFactory, login_root};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
+use sysinfo::{Pid, ProcessesToUpdate, System};
use tokio::task::JoinSet;
const STREAM_NAME: &str = "race-test-stream";
@@ -41,6 +42,7 @@ const PRODUCERS_PER_PROTOCOL: usize = 2;
const PARTITION_ID: u32 = 0;
const TEST_DURATION_SECS: u64 = 10;
const MESSAGES_PER_BATCH: usize = 5;
+const MAX_ALLOWED_MEMORY_BYTES: u64 = 200 * 1024 * 1024;
/// Runs the segment rotation race condition test with multiple protocols.
/// Each client factory represents a different protocol (TCP, HTTP, QUIC,
WebSocket).
@@ -54,6 +56,9 @@ pub async fn run(client_factories: &[&dyn ClientFactory]) {
let admin_client = create_client(client_factories[0]).await;
login_root(&admin_client).await;
+ let stats = admin_client.get_stats().await.unwrap();
+ let server_pid = stats.process_id;
+
let total_producers = client_factories.len() * PRODUCERS_PER_PROTOCOL;
init_system(&admin_client, total_producers).await;
@@ -111,6 +116,18 @@ pub async fn run(client_factories: &[&dyn ClientFactory]) {
let sent = total_messages.load(Ordering::SeqCst);
println!("Test completed successfully. Total messages sent: {}", sent);
+ let final_memory = get_process_memory(server_pid);
+ println!(
+ "Final server memory: {:.2} MB",
+ final_memory as f64 / 1024.0 / 1024.0
+ );
+ assert!(
+ final_memory < MAX_ALLOWED_MEMORY_BYTES,
+ "Memory leak detected! Server using {:.2} MB, max allowed is {:.2} MB",
+ final_memory as f64 / 1024.0 / 1024.0,
+ MAX_ALLOWED_MEMORY_BYTES as f64 / 1024.0 / 1024.0
+ );
+
cleanup(&admin_client).await;
}
@@ -194,3 +211,11 @@ async fn cleanup(client: &IggyClient) {
.await
.unwrap();
}
+
+fn get_process_memory(pid: u32) -> u64 {
+ let mut sys = System::new();
+ sys.refresh_processes(ProcessesToUpdate::Some(&[Pid::from_u32(pid)]),
true);
+ sys.process(Pid::from_u32(pid))
+ .map(|p| p.memory())
+ .unwrap_or(0)
+}
diff --git a/core/server/src/shard/system/segments.rs
b/core/server/src/shard/system/segments.rs
index 38e91fd2d..b805f3da1 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -15,6 +15,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+use crate::configs::cache_indexes::CacheIndexesConfig;
use crate::shard::IggyShard;
use crate::streaming::segments::Segment;
use iggy_common::IggyError;
@@ -162,14 +163,15 @@ impl IggyShard {
) -> Result<(), IggyError> {
use crate::streaming::segments::storage::create_segment_storage;
- let start_offset = {
+ let (start_offset, old_segment_index) = {
let mut partitions = self.local_partitions.borrow_mut();
let partition = partitions
.get_mut(namespace)
.expect("rotate_segment: partition must exist");
+ let old_segment_index = partition.log.segments().len() - 1;
let active_segment = partition.log.active_segment_mut();
active_segment.sealed = true;
- active_segment.end_offset + 1
+ (active_segment.end_offset + 1, old_segment_index)
};
let segment = Segment::new(start_offset,
self.config.system.segment.size);
@@ -187,6 +189,20 @@ impl IggyShard {
let mut partitions = self.local_partitions.borrow_mut();
if let Some(partition) = partitions.get_mut(namespace) {
+ // Clear old segment's indexes if cache_indexes is not set to All.
+ // This prevents memory accumulation from keeping index buffers
for sealed segments.
+ if !matches!(
+ self.config.system.segment.cache_indexes,
+ CacheIndexesConfig::All
+ ) {
+ partition.log.indexes_mut()[old_segment_index] = None;
+ }
+
+ // Close writers for the sealed segment - they're never needed
after sealing.
+ // This releases file handles and associated kernel/io_uring
resources.
+ let old_storage = &mut
partition.log.storages_mut()[old_segment_index];
+ let _ = old_storage.shutdown();
+
partition.log.add_persisted_segment(segment, storage);
partition.stats.increment_segments_count(1);
tracing::info!(