This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 873b63965 feat(server): implement log rotation based on size and 
retention (#2452)
873b63965 is described below

commit 873b63965998e07ed7b50994286d1397b6359dd5
Author: Svecco <[email protected]>
AuthorDate: Mon Feb 2 17:43:48 2026 +0800

    feat(server): implement log rotation based on size and retention (#2452)
    
    - implemented log rotation based on size and retention as the title;
    - implemented configurable attributes and imported breaking changes;
    - added units and integration test in logger.rs and integration mod;
    - added documentations and imported new dependencies, etc.
---
 Cargo.lock                                         |  21 +
 DEPENDENCIES.md                                    |   2 +
 core/common/src/utils/byte_size.rs                 |   7 +
 core/common/src/utils/duration.rs                  |  29 ++
 core/integration/src/test_server.rs                |   4 +-
 .../server/scenarios/log_rotation_scenario.rs      | 382 +++++++++++++++++
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 core/integration/tests/server/specific.rs          |   5 +-
 core/server/Cargo.toml                             |   2 +
 core/server/config.toml                            |  22 +-
 core/server/src/configs/defaults.rs                |   9 +-
 core/server/src/configs/displays.rs                |   8 +-
 core/server/src/configs/system.rs                  |   7 +-
 core/server/src/configs/validators.rs              |  47 +-
 core/server/src/log/logger.rs                      | 475 ++++++++++++++++++++-
 foreign/cpp/tests/e2e/server.toml                  |  26 +-
 16 files changed, 1009 insertions(+), 38 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index b43d62fe0..f2806dd02 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3302,6 +3302,16 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "fs2"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
+dependencies = [
+ "libc",
+ "winapi",
+]
+
 [[package]]
 name = "fs_extra"
 version = "1.3.0"
@@ -7642,6 +7652,15 @@ dependencies = [
  "byteorder",
 ]
 
+[[package]]
+name = "rolling-file"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8395b4f860856b740f20a296ea2cd4d823e81a2658cf05ef61be22916026a906"
+dependencies = [
+ "chrono",
+]
+
 [[package]]
 name = "route-recognizer"
 version = "0.3.1"
@@ -8273,6 +8292,7 @@ dependencies = [
  "figlet-rs",
  "figment",
  "flume 0.12.0",
+ "fs2",
  "futures",
  "hash32 1.0.0",
  "human-repr",
@@ -8297,6 +8317,7 @@ dependencies = [
  "reqwest",
  "ringbuffer",
  "rmp-serde",
+ "rolling-file",
  "rust-embed",
  "rustls",
  "rustls-pemfile",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index a7fd8c256..924cbd10d 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -293,6 +293,7 @@ foreign-types-shared: 0.1.1, "Apache-2.0 OR MIT",
 form_urlencoded: 1.2.2, "Apache-2.0 OR MIT",
 fragile: 2.0.1, "Apache-2.0",
 fs-err: 3.2.2, "Apache-2.0 OR MIT",
+fs2: 0.4.3, "Apache-2.0 OR MIT",
 fs_extra: 1.3.0, "MIT",
 fsevent-sys: 4.1.0, "MIT",
 funty: 2.0.0, "MIT",
@@ -661,6 +662,7 @@ rmcp-macros: 0.14.0, "Apache-2.0",
 rmp: 0.8.15, "MIT",
 rmp-serde: 1.3.1, "MIT",
 roaring: 0.11.3, "Apache-2.0 OR MIT",
+rolling-file: 0.2.0, "Apache-2.0 OR MIT",
 route-recognizer: 0.3.1, "MIT",
 rsa: 0.9.10, "Apache-2.0 OR MIT",
 rust-embed: 8.11.0, "MIT",
diff --git a/core/common/src/utils/byte_size.rs 
b/core/common/src/utils/byte_size.rs
index 2ccf58bee..230f8b484 100644
--- a/core/common/src/utils/byte_size.rs
+++ b/core/common/src/utils/byte_size.rs
@@ -137,6 +137,13 @@ impl IggyByteSize {
         format!("{:.2}", self.0.get_appropriate_unit(UnitType::Decimal))
     }
 
+    /// Subtract another IggyByteSize value, return 0 if the result is 
negative.
+    pub fn saturating_sub(&self, other: &Self) -> Self {
+        let self_bytes = self.as_bytes_u64();
+        let other_bytes = other.as_bytes_u64();
+        IggyByteSize::new(self_bytes.saturating_sub(other_bytes))
+    }
+
     /// Calculates the throughput based on the provided duration and returns a 
human-readable string.
     pub(crate) fn _as_human_throughput_string(&self, duration: &IggyDuration) 
-> String {
         if duration.is_zero() {
diff --git a/core/common/src/utils/duration.rs 
b/core/common/src/utils/duration.rs
index cb256808a..aa6c796a7 100644
--- a/core/common/src/utils/duration.rs
+++ b/core/common/src/utils/duration.rs
@@ -29,6 +29,35 @@ use std::{
 
 pub const SEC_IN_MICRO: u64 = 1_000_000;
 
+/// A struct for representing time durations with various utility functions.
+///
+/// This struct wraps `std::time::Duration` and uses the `humantime` crate for 
parsing and formatting
+/// human-readable duration strings. It also implements serialization and 
deserialization via the `serde` crate.
+///
+/// # Example
+///
+/// ```
+/// use iggy_common::IggyDuration;
+/// use std::str::FromStr;
+///
+/// let duration = IggyDuration::from(3661_000_000_u64); // 3661 seconds in 
microseconds
+/// assert_eq!(3661, duration.as_secs());
+/// assert_eq!("1h 1m 1s", duration.as_human_time_string());
+/// assert_eq!("1h 1m 1s", format!("{}", duration));
+///
+/// let duration = IggyDuration::from(0_u64);
+/// assert_eq!(0, duration.as_secs());
+/// assert_eq!("0s", duration.as_human_time_string());
+/// assert_eq!("0s", format!("{}", duration));
+///
+/// let duration = IggyDuration::from_str("1h 1m 1s").unwrap();
+/// assert_eq!(3661, duration.as_secs());
+/// assert_eq!("1h 1m 1s", duration.as_human_time_string());
+/// assert_eq!("1h 1m 1s", format!("{}", duration));
+///
+/// let duration = IggyDuration::from_str("unlimited").unwrap();
+/// assert_eq!(0, duration.as_secs());
+/// ```
 #[derive(Debug, Clone, Copy, Eq, PartialEq)]
 pub struct IggyDuration {
     duration: Duration,
diff --git a/core/integration/src/test_server.rs 
b/core/integration/src/test_server.rs
index a58ad85a3..81c68b619 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -120,9 +120,9 @@ impl TestServer {
             Ok(parallelism) => {
                 let available_cpus = parallelism.get();
                 if available_cpus >= 4 {
-                    let mut rng = rand::thread_rng();
+                    let mut rng = rand::rng();
                     let max_start = available_cpus - 4;
-                    let start = rng.gen_range(0..=max_start);
+                    let start = rng.random_range(0..=max_start);
                     let end = start + 4;
                     format!("{}..{}", start, end)
                 } else {
diff --git a/core/integration/tests/server/scenarios/log_rotation_scenario.rs 
b/core/integration/tests/server/scenarios/log_rotation_scenario.rs
new file mode 100644
index 000000000..69ccccd41
--- /dev/null
+++ b/core/integration/tests/server/scenarios/log_rotation_scenario.rs
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{PARTITIONS_COUNT, STREAM_NAME, TOPIC_NAME};
+use iggy::prelude::*;
+use iggy_common::{
+    CompressionAlgorithm, Identifier, IggyByteSize, IggyDuration, IggyExpiry, 
MaxTopicSize,
+};
+use integration::tcp_client::TcpClientFactory;
+use integration::test_server::{ClientFactory, IpAddrKind, TestServer, 
login_root};
+use once_cell::sync::Lazy;
+use serial_test::parallel;
+use std::collections::HashMap;
+use std::path::Path;
+use std::time::Duration;
+use test_case::test_matrix;
+use tokio::fs;
+use tokio::sync::Mutex;
+use tokio::time::{sleep, timeout};
+
+const RETENTION_SECS: u64 = 30;
+const OPERATION_TIMEOUT_SECS: u64 = 10;
+const OPERATION_LOOP_COUNT: usize = 300;
+const FROM_BYTES_TO_KB: u64 = 1000;
+const IGGY_LOG_BASE_NAME: &str = "iggy-server.log";
+
+static PRINT_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
+
+#[derive(Debug)]
+pub struct LogRotationTestConfig {
+    pub name: String,
+    pub max_single_log_size: IggyByteSize,
+    pub max_total_log_size: IggyByteSize,
+    pub rotation_check_interval: IggyDuration,
+    pub retention: IggyDuration,
+}
+
+fn config_regular_rotation() -> LogRotationTestConfig {
+    LogRotationTestConfig {
+        name: "log_regular_rotation".to_string(),
+        max_single_log_size: IggyByteSize::new(100_000),
+        max_total_log_size: IggyByteSize::new(400_000),
+        rotation_check_interval: IggyDuration::ONE_SECOND,
+        retention: IggyDuration::new_from_secs(RETENTION_SECS),
+    }
+}
+
+fn config_unlimited_size() -> LogRotationTestConfig {
+    LogRotationTestConfig {
+        name: "log_unlimited_size".to_string(),
+        max_single_log_size: IggyByteSize::new(0),
+        max_total_log_size: IggyByteSize::new(400_000),
+        rotation_check_interval: IggyDuration::ONE_SECOND,
+        retention: IggyDuration::new_from_secs(RETENTION_SECS),
+    }
+}
+
+fn config_unlimited_archives() -> LogRotationTestConfig {
+    LogRotationTestConfig {
+        name: "log_unlimited_archives".to_string(),
+        max_single_log_size: IggyByteSize::new(100_000),
+        max_total_log_size: IggyByteSize::new(0),
+        rotation_check_interval: IggyDuration::ONE_SECOND,
+        retention: IggyDuration::new_from_secs(RETENTION_SECS),
+    }
+}
+
+fn config_special_scenario() -> LogRotationTestConfig {
+    LogRotationTestConfig {
+        name: "log_special_scenario".to_string(),
+        max_single_log_size: IggyByteSize::new(0),
+        max_total_log_size: IggyByteSize::new(0),
+        rotation_check_interval: IggyDuration::ONE_SECOND,
+        retention: IggyDuration::new_from_secs(RETENTION_SECS),
+    }
+}
+
+#[test_matrix(
+    [config_regular_rotation(), config_unlimited_size(), 
config_unlimited_archives(), config_special_scenario()]
+)]
+#[tokio::test]
+#[parallel]
+async fn log_rotation_should_be_valid(present_log_config: 
LogRotationTestConfig) {
+    let mut extra_envs = HashMap::new();
+    extra_envs.insert(
+        "IGGY_SYSTEM_LOGGING_MAX_FILE_SIZE".to_string(),
+        format!("{}", present_log_config.max_single_log_size),
+    );
+    extra_envs.insert(
+        "IGGY_SYSTEM_LOGGING_MAX_TOTAL_SIZE".to_string(),
+        format!("{}", present_log_config.max_total_log_size),
+    );
+    extra_envs.insert(
+        "IGGY_SYSTEM_LOGGING_ROTATION_CHECK_INTERVAL".to_string(),
+        format!("{}", present_log_config.rotation_check_interval),
+    );
+    extra_envs.insert(
+        "IGGY_SYSTEM_LOGGING_RETENTION".to_string(),
+        format!("{}", present_log_config.retention),
+    );
+
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+    let client_factory = TcpClientFactory {
+        server_addr,
+        ..Default::default()
+    };
+
+    let log_dir = format!("{}/logs", test_server.get_local_data_path());
+
+    test_server.assert_running();
+    run(&client_factory, &log_dir, present_log_config).await;
+}
+
+async fn run(
+    client_factory: &dyn ClientFactory,
+    log_dir: &str,
+    present_log_config: LogRotationTestConfig,
+) {
+    let done_status = false;
+    let present_log_test_title = present_log_config.name.clone();
+    let log_path = Path::new(log_dir);
+    assert!(
+        log_path.exists() && log_path.is_dir(),
+        "failed::no_such_directory => {log_dir}",
+    );
+
+    let client = init_valid_client(client_factory).await;
+    assert!(
+        client.is_ok(),
+        "failed::client_initialize => {:?}",
+        client.as_ref().err(),
+    );
+
+    let generator_result = 
generate_enough_logs(client.as_ref().unwrap()).await;
+    assert!(
+        generator_result.is_ok(),
+        "failed::generate_logs => {:?}",
+        generator_result.as_ref().err(),
+    );
+
+    nocapture_observer(log_path, &present_log_test_title, done_status).await;
+    sleep(present_log_config.rotation_check_interval.get_duration()).await;
+
+    let rotation_result = validate_log_rotation_rules(log_path, 
present_log_config).await;
+    assert!(
+        rotation_result.is_ok(),
+        "failed::rotation_check => {:?}",
+        rotation_result.as_ref().err(),
+    );
+
+    nocapture_observer(log_path, &present_log_test_title, !done_status).await;
+}
+
+async fn init_valid_client(client_factory: &dyn ClientFactory) -> 
Result<IggyClient, String> {
+    let operation_timeout = 
IggyDuration::new(Duration::from_secs(OPERATION_TIMEOUT_SECS));
+    let client_wrapper = timeout(
+        operation_timeout.get_duration(),
+        client_factory.create_client(),
+    )
+    .await
+    .map_err(|_| "ClientWrapper creation timed out")?;
+
+    timeout(operation_timeout.get_duration(), client_wrapper.connect())
+        .await
+        .map_err(|_| "Client connection timed out")?
+        .map_err(|e| format!("Client connection failed: {e:?}"))?;
+
+    let client = IggyClient::create(client_wrapper, None, None);
+    timeout(operation_timeout.get_duration(), login_root(&client))
+        .await
+        .map_err(|e| format!("Root user login timed out: {e:?}"))?;
+
+    Ok(client)
+}
+
+/// Loop through the creation and deletion of streams / topics
+/// to trigger server business operations,  thereby generating
+/// sufficient log data to meet the trigger conditions for the
+/// log rotation test.
+async fn generate_enough_logs(client: &IggyClient) -> Result<(), String> {
+    for i in 0..OPERATION_LOOP_COUNT {
+        let stream_name = format!("{STREAM_NAME}-{i}");
+        let topic_name = format!("{TOPIC_NAME}-{i}");
+
+        client
+            .create_stream(&stream_name)
+            .await
+            .map_err(|e| format!("Failed to create {stream_name}: {e}"))?;
+
+        let stream_identifier = Identifier::named(&stream_name)
+            .map_err(|e| format!("Failed to create stream label {e}"))?;
+
+        client
+            .create_topic(
+                &stream_identifier,
+                &topic_name,
+                PARTITIONS_COUNT,
+                CompressionAlgorithm::default(),
+                None,
+                IggyExpiry::NeverExpire,
+                MaxTopicSize::Unlimited,
+            )
+            .await
+            .map_err(|e| format!("Failed to create topic {topic_name}: {e}"))?;
+
+        client
+            .delete_stream(&stream_identifier)
+            .await
+            .map_err(|e| format!("Failed to remove stream {stream_name}: 
{e}"))?;
+    }
+
+    Ok(())
+}
+
+async fn validate_log_rotation_rules(
+    log_dir: &Path,
+    present_log_config: LogRotationTestConfig,
+) -> Result<(), String> {
+    let log_dir_display = log_dir.display();
+    let mut dir_entries = fs::read_dir(log_dir)
+        .await
+        .map_err(|e| format!("Failed to read log directory 
'{log_dir_display}': {e}",))?;
+
+    let mut valid_log_files = Vec::new();
+    while let Some(entry) = dir_entries.next_entry().await.map_err(|e| {
+        format!("Failed to read next entry in log directory 
'{log_dir_display}': {e}",)
+    })? {
+        let file_path = entry.path();
+
+        if !file_path.is_file() {
+            continue;
+        }
+
+        let file_name = match file_path.file_name().and_then(|name| 
name.to_str()) {
+            Some(name) => name,
+            None => continue,
+        };
+
+        if is_valid_iggy_log_file(file_name) {
+            valid_log_files.push(file_path);
+        }
+    }
+
+    if valid_log_files.is_empty() {
+        return Err(format!(
+            "No valid Iggy log files found in directory '{}'. Expected files 
matching '{}' (original) or '{}.<numeric>' (archived).",
+            log_dir_display, IGGY_LOG_BASE_NAME, IGGY_LOG_BASE_NAME
+        ));
+    }
+
+    // logger.rs => tracing_appender::non_blocking(file_appender);
+    // The delay in log writing in Iggy mainly depends on the processing speed
+    // of background threads and the operating system's I/O scheduling,  which
+    // means that the actual size of written logs may be slightly larger  than
+    // expected. So there ignores tiny minor overflow by comparing integer  KB
+    // values instead of exact bytes.
+
+    let mut total_log_size = IggyByteSize::new(0);
+    let max_single_kb = present_log_config.max_single_log_size.as_bytes_u64() 
/ FROM_BYTES_TO_KB;
+    let max_total_kb = present_log_config.max_total_log_size.as_bytes_u64() / 
FROM_BYTES_TO_KB;
+    let present_file_amount = valid_log_files.len();
+
+    if max_single_kb == 0 && present_file_amount > 1 {
+        return Err(format!(
+            "Log size should be unlimited if `max_file_size` is set to 0, 
found {} files.",
+            present_file_amount,
+        ));
+    } else if max_total_kb == 0 && max_single_kb != 0 {
+        if present_file_amount as u64 <= 1 {
+            return Err(format!(
+                "Archives should be unlimited if `max_total_size` is set to 0, 
found {} files.",
+                present_file_amount,
+            ));
+        }
+    } else {
+        for log_file in valid_log_files {
+            let file_metadata = fs::metadata(&log_file).await.map_err(|e| {
+                format!(
+                    "Failed to get metadata for file '{}': {}",
+                    log_file.display(),
+                    e
+                )
+            })?;
+
+            let file_size_bytes = file_metadata.len();
+            if max_single_kb != 0 {
+                let current_single_kb = file_size_bytes / FROM_BYTES_TO_KB;
+                if current_single_kb > max_single_kb {
+                    return Err(format!(
+                        "Single log file exceeds maximum allowed size: '{}'",
+                        log_file.display()
+                    ));
+                }
+            }
+
+            total_log_size += IggyByteSize::new(file_size_bytes);
+        }
+    }
+
+    let current_total_kb = total_log_size.as_bytes_u64() / FROM_BYTES_TO_KB;
+    if max_total_kb != 0 && max_single_kb != 0 && current_total_kb > 
max_total_kb {
+        return Err(format!(
+            "Total log size exceeds maximum:{} expected: '{}'KB",
+            log_dir_display, max_total_kb,
+        ));
+    } else if max_total_kb != 0
+        && max_single_kb != 0
+        && present_file_amount as u64 > max_total_kb / max_single_kb
+    {
+        return Err(format!(
+            "Total log file amount exceeds:{} expected: '{}'",
+            log_dir_display,
+            max_total_kb / max_single_kb,
+        ));
+    }
+
+    Ok(())
+}
+
+fn is_valid_iggy_log_file(file_name: &str) -> bool {
+    if file_name == IGGY_LOG_BASE_NAME {
+        return true;
+    }
+
+    let archive_log_prefix = format!("{}.", IGGY_LOG_BASE_NAME);
+    if file_name.starts_with(&archive_log_prefix) {
+        let numeric_suffix = &file_name[archive_log_prefix.len()..];
+        return !numeric_suffix.is_empty() && numeric_suffix.chars().all(|c| 
c.is_ascii_digit());
+    }
+    false
+}
+
+/// Solely for manual && direct observation of file status to
+/// reduce debugging overhead. Due to the different nature of
+/// asynchronous tasks,  the output order of scenarios may be
+/// mixed, but the mutex can prevent messy terminal output.
+async fn nocapture_observer(log_path: &Path, title: &str, done: bool) -> () {
+    let _lock = PRINT_LOCK.lock().await;
+    eprintln!(
+        "\n{:>4}\x1b[33m Size\x1b[0m <-> \x1b[33mPath\x1b[0m && 
server::specific::log_rotation_should_be_valid::\x1b[33m{}\x1b[0m",
+        "", title,
+    );
+
+    let mut dir_entries = fs::read_dir(log_path).await.unwrap();
+    while let Some(entry) = dir_entries.next_entry().await.unwrap() {
+        let file_path = entry.path();
+        if file_path.is_file() {
+            let meta = fs::metadata(&file_path).await.unwrap();
+            eprintln!(
+                "{:>6} KB <-> {:<50}",
+                meta.len() / FROM_BYTES_TO_KB,
+                file_path.display()
+            );
+        }
+    }
+
+    if done {
+        eprintln!(
+            "\n\x1b[32m [Passed]\x1b[0m <-> {:<25} <{:->45}>\n",
+            title, "",
+        );
+    }
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 3253493c4..b04866f37 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -29,6 +29,7 @@ pub mod create_message_payload;
 pub mod cross_protocol_pat_scenario;
 pub mod delete_segments_scenario;
 pub mod encryption_scenario;
+pub mod log_rotation_scenario;
 pub mod message_headers_scenario;
 pub mod message_size_scenario;
 pub mod offset_scenario;
diff --git a/core/integration/tests/server/specific.rs 
b/core/integration/tests/server/specific.rs
index 3e6849488..afd4a3a0c 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -134,7 +135,7 @@ async fn tcp_tls_self_signed_scenario_should_be_valid() {
         .await
         .expect("Failed to connect TLS client with self-signed cert");
 
-    let client = 
iggy::clients::client::IggyClient::create(ClientWrapper::Iggy(client), None, 
None);
+    let client = IggyClient::create(ClientWrapper::Iggy(client), None, None);
 
     tcp_tls_scenario::run(&client).await;
 }
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 6789e6cb0..7f84a50b6 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -66,6 +66,7 @@ error_set = { workspace = true }
 figlet-rs = { workspace = true }
 figment = { workspace = true }
 flume = { workspace = true }
+fs2 = "0.4.3"
 futures = { workspace = true }
 hash32 = { workspace = true }
 human-repr = { workspace = true }
@@ -89,6 +90,7 @@ rand = { workspace = true }
 reqwest = { workspace = true, features = ["rustls-tls-no-provider"] }
 ringbuffer = { workspace = true }
 rmp-serde = { workspace = true }
+rolling-file = "0.2.0"
 rust-embed = { workspace = true, optional = true }
 rustls = { workspace = true }
 rustls-pemfile = { workspace = true }
diff --git a/core/server/config.toml b/core/server/config.toml
index cec71ddc7..3949b9cd5 100644
--- a/core/server/config.toml
+++ b/core/server/config.toml
@@ -363,10 +363,24 @@ level = "info"
 # When enabled, logs are stored in {system.path}/{system.logging.path} 
(default: local_data/logs).
 file_enabled = true
 
-# Maximum size of the log files before rotation.
-max_size = "512 MB"
-
-# Time to retain log files before deletion.
+# Maximum size of a single log file before rotation occurs. When a log
+# file reaches this size,  it will be rotated   (closed and a new file
+# created). This setting works together with max_total_size to control
+# log storage.  You can set it to 0 to enable unlimited size of single
+# log,  but all logs will be written to a single file,  thus disabling
+# log rotation. Please configure 0 with caution, esp. RUST_LOG > debug
+max_file_size = "500 MB"
+
+# Maximum total size of all log files.  When this size is reached,
+# the oldest log files will be deleted first. Set it to 0 to allow
+# an unlimited number of archived logs. This does not disable time
+# based log rotation or per-log-file size limits.
+max_total_size = "4 GB"
+
+# Time interval for checking log rotation status. Avoid less than 1s.
+rotation_check_interval = "1 h"
+
+# Time to retain log files before deletion. Avoid less than 1s, too.
 retention = "7 days"
 
 # Interval for printing system information to the log.
diff --git a/core/server/src/configs/defaults.rs 
b/core/server/src/configs/defaults.rs
index af283442e..ce9a84ab4 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -402,7 +402,14 @@ impl Default for LoggingConfig {
             path: SERVER_CONFIG.system.logging.path.parse().unwrap(),
             level: SERVER_CONFIG.system.logging.level.parse().unwrap(),
             file_enabled: SERVER_CONFIG.system.logging.file_enabled,
-            max_size: SERVER_CONFIG.system.logging.max_size.parse().unwrap(),
+            max_file_size: 
SERVER_CONFIG.system.logging.max_file_size.parse().unwrap(),
+            max_total_size: 
SERVER_CONFIG.system.logging.max_total_size.parse().unwrap(),
+            rotation_check_interval: SERVER_CONFIG
+                .system
+                .logging
+                .rotation_check_interval
+                .parse()
+                .unwrap(),
             retention: SERVER_CONFIG.system.logging.retention.parse().unwrap(),
             sysinfo_print_interval: SERVER_CONFIG
                 .system
diff --git a/core/server/src/configs/displays.rs 
b/core/server/src/configs/displays.rs
index 8973b356a..515011c25 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/server/src/configs/displays.rs
@@ -170,7 +170,7 @@ impl Display for ServerConfig {
 }
 
 impl Display for MessageSaverConfig {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
             "{{ enabled: {}, enforce_fsync: {}, interval: {} }}",
@@ -249,11 +249,13 @@ impl Display for LoggingConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ path: {}, level: {}, file_enabled: {}, max_size: {}, 
retention: {} }}",
+            "{{ path: {}, level: {}, file_enabled: {}, max_file_size: {}, 
max_total_size: {}, rotation_check_interval: {}, retention: {} }}",
             self.path,
             self.level,
             self.file_enabled,
-            self.max_size.as_human_string_with_zero_as_unlimited(),
+            self.max_file_size.as_human_string_with_zero_as_unlimited(),
+            self.max_total_size.as_human_string_with_zero_as_unlimited(),
+            self.rotation_check_interval,
             self.retention
         )
     }
diff --git a/core/server/src/configs/system.rs 
b/core/server/src/configs/system.rs
index f8c7423ed..8e1205967 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -86,7 +86,12 @@ pub struct LoggingConfig {
     pub level: String,
     pub file_enabled: bool,
     #[config_env(leaf)]
-    pub max_size: IggyByteSize,
+    pub max_file_size: IggyByteSize,
+    #[config_env(leaf)]
+    pub max_total_size: IggyByteSize,
+    #[config_env(leaf)]
+    #[serde_as(as = "DisplayFromStr")]
+    pub rotation_check_interval: IggyDuration,
     #[config_env(leaf)]
     #[serde_as(as = "DisplayFromStr")]
     pub retention: IggyDuration,
diff --git a/core/server/src/configs/validators.rs 
b/core/server/src/configs/validators.rs
index 987b0d86a..ceb0fe30d 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -22,7 +22,7 @@ use super::server::{
     DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, 
TelemetryConfig,
 };
 use super::sharding::{CpuAllocation, ShardingConfig};
-use super::system::{CompressionConfig, PartitionConfig};
+use super::system::{CompressionConfig, LoggingConfig, PartitionConfig};
 use crate::configs::COMPONENT;
 use crate::configs::server::{MemoryPoolConfig, PersonalAccessTokenConfig, 
ServerConfig};
 use crate::configs::sharding::NumaTopology;
@@ -82,6 +82,13 @@ impl Validatable<ConfigurationError> for ServerConfig {
             format!("{COMPONENT} (error: {e}) - failed to validate cluster 
config")
         })?;
 
+        self.system
+            .logging
+            .validate()
+            .error(|e: &ConfigurationError| {
+                format!("{COMPONENT} (error: {e}) - failed to validate logging 
config")
+            })?;
+
         let topic_size = match self.system.topic.max_size {
             MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()),
             MaxTopicSize::Unlimited => Ok(u64::MAX),
@@ -250,6 +257,44 @@ impl Validatable<ConfigurationError> for 
PersonalAccessTokenConfig {
     }
 }
 
+impl Validatable<ConfigurationError> for LoggingConfig {
+    fn validate(&self) -> Result<(), ConfigurationError> {
+        if self.level.is_empty() {
+            error!("system.logging.level is supposed be configured");
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+
+        if self.retention.as_secs() < 1 {
+            error!(
+                "Configured system.logging.retention {} is less than minimum 1 
second",
+                self.retention
+            );
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+
+        if self.rotation_check_interval.as_secs() < 1 {
+            error!(
+                "Configured system.logging.rotation_check_interval {} is less 
than minimum 1 second",
+                self.rotation_check_interval
+            );
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+
+        let max_total_size_unlimited = self.max_total_size.as_bytes_u64() == 0;
+        if !max_total_size_unlimited
+            && self.max_file_size.as_bytes_u64() > 
self.max_total_size.as_bytes_u64()
+        {
+            error!(
+                "Configured system.logging.max_total_size {} is less than 
system.logging.max_file_size {}",
+                self.max_total_size, self.max_file_size
+            );
+            return Err(ConfigurationError::InvalidConfigurationValue);
+        }
+
+        Ok(())
+    }
+}
+
 impl Validatable<ConfigurationError> for MemoryPoolConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         if self.enabled && self.size == 0 {
diff --git a/core/server/src/log/logger.rs b/core/server/src/log/logger.rs
index 4c0e0691e..b9b4d2aa6 100644
--- a/core/server/src/log/logger.rs
+++ b/core/server/src/log/logger.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -21,6 +22,7 @@ use crate::configs::server::{TelemetryConfig, 
TelemetryTransport};
 use crate::configs::system::LoggingConfig;
 use crate::log::runtime::CompioRuntime;
 use crate::server_error::LogError;
+use iggy_common::{IggyByteSize, IggyDuration};
 use opentelemetry::KeyValue;
 use opentelemetry::global;
 use opentelemetry::trace::TracerProvider;
@@ -30,10 +32,14 @@ use opentelemetry_sdk::Resource;
 use opentelemetry_sdk::logs::log_processor_with_async_runtime;
 use opentelemetry_sdk::propagation::TraceContextPropagator;
 use opentelemetry_sdk::trace::span_processor_with_async_runtime;
+use rolling_file::{BasicRollingFileAppender, RollingConditionBasic};
+use std::fs;
 use std::io::{self, Write};
 use std::path::PathBuf;
+use std::sync::atomic::AtomicBool;
 use std::sync::{Arc, Mutex};
-use tracing::{info, trace};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tracing::{debug, error, info, trace, warn};
 use tracing_appender::non_blocking::WorkerGuard;
 use tracing_opentelemetry::OpenTelemetryLayer;
 use tracing_subscriber::field::{RecordFields, VisitOutput};
@@ -47,15 +53,16 @@ use tracing_subscriber::{
 };
 
 const IGGY_LOG_FILE_PREFIX: &str = "iggy-server.log";
+const ONE_HUNDRED_THOUSAND: u64 = 100_000;
 
 // Writer that does nothing
 struct NullWriter;
 impl Write for NullWriter {
-    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
         Ok(buf.len())
     }
 
-    fn flush(&mut self) -> std::io::Result<()> {
+    fn flush(&mut self) -> io::Result<()> {
         Ok(())
     }
 }
@@ -63,13 +70,13 @@ impl Write for NullWriter {
 // Wrapper around Arc<Mutex<Vec<String>>> to implement Write
 struct VecStringWriter(Arc<Mutex<Vec<String>>>);
 impl Write for VecStringWriter {
-    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
         let mut lock = self.0.lock().unwrap();
         lock.push(String::from_utf8_lossy(buf).into_owned());
         Ok(buf.len())
     }
 
-    fn flush(&mut self) -> std::io::Result<()> {
+    fn flush(&mut self) -> io::Result<()> {
         // Just nop, we don't need to flush anything
         Ok(())
     }
@@ -128,6 +135,9 @@ pub struct Logging {
     otel_traces_reload_handle: Option<ReloadHandle<FilteredRegistry>>,
 
     early_logs_buffer: Arc<Mutex<Vec<String>>>,
+    rotation_should_stop: Arc<AtomicBool>,
+    rotation_thread: Option<std::thread::JoinHandle<()>>,
+    rotation_stop_sender: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
 }
 
 impl Logging {
@@ -140,7 +150,10 @@ impl Logging {
             env_filter_reload_handle: None,
             otel_logs_reload_handle: None,
             otel_traces_reload_handle: None,
+            rotation_thread: None,
+            rotation_stop_sender: Arc::new(Mutex::new(None)),
             early_logs_buffer: Arc::new(Mutex::new(vec![])),
+            rotation_should_stop: Arc::new(AtomicBool::new(false)),
         }
     }
 
@@ -213,7 +226,7 @@ impl Logging {
         // Use the rolling appender to avoid having a huge log file.
         // Make sure logs are dumped to the file during graceful shutdown.
 
-        trace!("Logging config: {}", config);
+        trace!("Logging config: {config}");
 
         // Reload EnvFilter with config level if RUST_LOG is not set.
         // Config level supports EnvFilter syntax (e.g., 
"warn,server=debug,iggy=trace").
@@ -232,7 +245,7 @@ impl Logging {
         };
 
         // Initialize non-blocking stdout layer
-        let (non_blocking_stdout, stdout_guard) = 
tracing_appender::non_blocking(std::io::stdout());
+        let (non_blocking_stdout, stdout_guard) = 
tracing_appender::non_blocking(io::stdout());
         let stdout_layer = fmt::Layer::default()
             .with_ansi(true)
             .event_format(Self::get_log_format())
@@ -252,9 +265,51 @@ impl Logging {
         let logs_path = if config.file_enabled {
             let base_directory = PathBuf::from(base_directory);
             let logs_subdirectory = PathBuf::from(config.path.clone());
-            let logs_path = base_directory.join(logs_subdirectory.clone());
-            let file_appender =
-                tracing_appender::rolling::hourly(logs_path.clone(), 
IGGY_LOG_FILE_PREFIX);
+            let logs_subdirectory = logs_subdirectory
+                .canonicalize()
+                .unwrap_or(logs_subdirectory);
+            let logs_path = base_directory.join(logs_subdirectory);
+
+            if let Err(e) = fs::create_dir_all(&logs_path) {
+                warn!("Failed to create logs directory {logs_path:?}: {e}");
+                return Err(LogError::FileReloadFailure);
+            }
+
+            // Check available disk space, at least 10MB
+            let min_disk_space: u64 = ONE_HUNDRED_THOUSAND * 100;
+            if let Ok(available_space) = fs2::available_space(&logs_path) {
+                if available_space < min_disk_space {
+                    warn!(
+                        "Low disk space for logs. Available: {available_space} 
bytes, Recommended: {min_disk_space} bytes"
+                    );
+                }
+            } else {
+                warn!("Failed to check available disk space for logs 
directory: {logs_path:?}");
+            }
+
+            let max_files = Self::calculate_max_files(config.max_total_size, 
config.max_file_size);
+
+            // If max_file_size == 0, then keep interpreting behavior as same
+            // as fn IggyByteSize::as_human_string_with_zero_as_unlimited do.
+            // This will cover all log rotations if expecting unlimited.
+            let mut condition_builder = RollingConditionBasic::new();
+            let max_file_size_bytes = config.max_file_size.as_bytes_u64();
+
+            if max_file_size_bytes != 0 {
+                condition_builder = 
condition_builder.max_size(max_file_size_bytes).hourly();
+            }
+            let condition = condition_builder;
+
+            let file_appender = BasicRollingFileAppender::new(
+                logs_path.join(IGGY_LOG_FILE_PREFIX),
+                condition,
+                max_files,
+            )
+            .map_err(|e| {
+                error!("Failed to create file appender: {e}");
+                LogError::FileReloadFailure
+            })?;
+
             let (mut non_blocking_file, file_guard) = 
tracing_appender::non_blocking(file_appender);
 
             self.dump_to_file(&mut non_blocking_file);
@@ -284,9 +339,11 @@ impl Logging {
             self.init_telemetry(telemetry_config)?;
         }
 
+        self.rotation_thread = self.install_log_rotation_handler(config, 
logs_path.as_ref());
+
         if let Some(logs_path) = logs_path {
             info!(
-                "Logging initialized, logs will be stored at: {logs_path:?}. 
Logs will be rotated hourly. Log filter: {log_filter}."
+                "Logging initialized, logs will be stored at: {logs_path:?}. 
Logs will be rotated based on size. Log filter: {log_filter}."
             );
         } else {
             info!("Logging initialized (file output disabled). Log filter: 
{log_filter}.");
@@ -387,8 +444,8 @@ impl Logging {
             .expect("Failed to modify telemetry traces layer");
 
         info!(
-            "Telemetry initialized with service name: {}",
-            telemetry_config.service_name
+            "Telemetry initialized with service name: {config_service_name}",
+            config_service_name = telemetry_config.service_name
         );
         Ok(())
     }
@@ -397,10 +454,6 @@ impl Logging {
         Format::default().with_thread_names(true)
     }
 
-    fn _install_log_rotation_handler(&self) {
-        todo!("Implement log rotation handler based on size and retention 
time");
-    }
-
     fn print_build_info() {
         if option_env!("IGGY_CI_BUILD") == Some("true") {
             let hash = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown");
@@ -408,8 +461,7 @@ impl Logging {
             let rust_version = 
option_env!("VERGEN_RUSTC_SEMVER").unwrap_or("unknown");
             let target = 
option_env!("VERGEN_CARGO_TARGET_TRIPLE").unwrap_or("unknown");
             info!(
-                "Version: {VERSION}, hash: {}, built at: {} using rust 
version: {} for target: {}",
-                hash, built_at, rust_version, target
+                "Version: {VERSION}, hash: {hash}, built at: {built_at} using 
rust version: {rust_version} for target: {target}"
             );
         } else {
             info!(
@@ -417,6 +469,281 @@ impl Logging {
             )
         }
     }
+
+    fn calculate_max_files(
+        max_total_size_bytes: IggyByteSize,
+        max_file_size_bytes: IggyByteSize,
+    ) -> usize {
+        if max_total_size_bytes == 0 {
+            // If the third attribute of BasicRollingFileAppender::new()
+            // is `usize::MAX` then it would reach iter capability.
+            ONE_HUNDRED_THOUSAND as usize
+        } else if max_file_size_bytes == 0 {
+            1
+        } else {
+            let max_files =
+                max_total_size_bytes.as_bytes_u64() / 
max_file_size_bytes.as_bytes_u64();
+            max_files.clamp(1, ONE_HUNDRED_THOUSAND) as usize
+        }
+    }
+
+    fn install_log_rotation_handler(
+        &self,
+        config: &LoggingConfig,
+        logs_path: Option<&PathBuf>,
+    ) -> Option<std::thread::JoinHandle<()>> {
+        let logs_path = logs_path?;
+        let path = logs_path.to_path_buf();
+        let max_total_size = config.max_total_size;
+        let max_file_size = config.max_file_size;
+        let rotation_check_interval = config.rotation_check_interval;
+        let retention = config.retention;
+        let should_stop = Arc::clone(&self.rotation_should_stop);
+
+        let (tx, rx) = std::sync::mpsc::channel::<()>();
+        *self.rotation_stop_sender.lock().unwrap() = Some(tx.clone());
+
+        let handle = std::thread::Builder::new()
+            .name("log-rotation".to_string())
+            .spawn(move || {
+                Self::run_log_rotation_loop(
+                    path,
+                    retention,
+                    max_total_size,
+                    max_file_size,
+                    rotation_check_interval,
+                    should_stop,
+                    rx,
+                )
+            })
+            .expect("Failed to spawn log rotation thread");
+
+        Some(handle)
+    }
+
+    fn run_log_rotation_loop(
+        path: PathBuf,
+        retention: IggyDuration,
+        max_total_size: IggyByteSize,
+        max_file_size: IggyByteSize,
+        check_interval: IggyDuration,
+        should_stop: Arc<AtomicBool>,
+        rx: std::sync::mpsc::Receiver<()>,
+    ) {
+        loop {
+            if should_stop.load(std::sync::atomic::Ordering::Relaxed) {
+                debug!("Log rotation thread detected stop flag, exiting");
+                break;
+            }
+
+            match rx.recv_timeout(check_interval.get_duration()) {
+                Ok(_) => {
+                    debug!("Log rotation thread received channel stop signal, 
exiting");
+                    break;
+                }
+                Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
+                    Self::cleanup_log_files(&path, retention, max_total_size, 
max_file_size);
+                }
+                Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
+                    warn!("Log rotation channel disconnected, exiting thread");
+                    break;
+                }
+            }
+        }
+
+        debug!("Log rotation thread exited gracefully");
+    }
+
+    fn read_log_files(logs_path: &PathBuf) -> Vec<(fs::DirEntry, SystemTime, 
Duration, u64)> {
+        let entries = match fs::read_dir(logs_path) {
+            Ok(entries) => entries,
+            Err(e) => {
+                warn!("Failed to read log directory {logs_path:?}: {e}");
+                return Vec::new();
+            }
+        };
+
+        let mut file_entries = Vec::new();
+
+        for entry in entries.flatten() {
+            if let Some(file_name) = entry.file_name().to_str() {
+                if file_name == IGGY_LOG_FILE_PREFIX {
+                    // Skip the actively written primary log file
+                    continue;
+                }
+                if !file_name.starts_with(IGGY_LOG_FILE_PREFIX) {
+                    continue;
+                }
+            } else {
+                continue;
+            }
+
+            let metadata = match entry.metadata() {
+                Ok(metadata) => metadata,
+                Err(e) => {
+                    warn!(
+                        "Failed to get metadata for {entry_path:?}: {e}",
+                        entry_path = entry.path()
+                    );
+                    continue;
+                }
+            };
+
+            if !metadata.is_file() {
+                continue;
+            }
+
+            let modified = match metadata.modified() {
+                Ok(modified) => modified,
+                Err(e) => {
+                    warn!(
+                        "Failed to get modification time for {entry_path:?}: 
{e}",
+                        entry_path = entry.path()
+                    );
+                    continue;
+                }
+            };
+
+            let elapsed = match modified.duration_since(UNIX_EPOCH) {
+                Ok(elapsed) => elapsed,
+                Err(e) => {
+                    warn!(
+                        "Failed to calculate elapsed time for {entry_path:?}: 
{e}",
+                        entry_path = entry.path()
+                    );
+                    continue;
+                }
+            };
+
+            let file_size = metadata.len();
+            file_entries.push((entry, modified, elapsed, file_size));
+        }
+
+        file_entries
+    }
+
+    fn cleanup_log_files(
+        logs_path: &PathBuf,
+        retention: IggyDuration,
+        max_total_size: IggyByteSize,
+        max_file_size: IggyByteSize,
+    ) {
+        debug!("Starting log cleanup for directory: {logs_path:?}");
+        debug!(
+            "retention: {retention:?}, max_total_size: {max_total_size} bytes, 
max_single_file_size: {max_file_size} bytes"
+        );
+
+        let mut file_entries = Self::read_log_files(logs_path);
+        debug!(
+            "Processed {file_entries_len} log files from directory: 
{logs_path:?}",
+            file_entries_len = file_entries.len(),
+        );
+
+        let mut removed_files_count = 0;
+        let cutoff = if !retention.is_zero() {
+            match SystemTime::now().duration_since(UNIX_EPOCH) {
+                Ok(now) => Some(now - retention.get_duration()),
+                Err(e) => {
+                    warn!("Failed to get current time: {e}");
+                    return;
+                }
+            }
+        } else {
+            None
+        };
+
+        let mut expired_file_indices = Vec::new();
+        for (idx, tuple) in file_entries.iter().enumerate() {
+            let entry = &tuple.0;
+            let elapsed = &tuple.2;
+
+            let mut need_remove = false;
+            if let Some(cutoff) = &cutoff
+                && *elapsed < *cutoff
+            {
+                need_remove = true;
+                debug!(
+                    "Mark old log file for remove: {entry_path:?}",
+                    entry_path = entry.path()
+                );
+            }
+
+            if need_remove {
+                expired_file_indices.push(idx);
+            }
+        }
+
+        for &idx in expired_file_indices.iter().rev() {
+            let entry = &file_entries[idx];
+            if fs::remove_file(entry.0.path()).is_ok() {
+                debug!(
+                    "Removed log file: {entry_path:?}",
+                    entry_path = entry.0.path()
+                );
+                removed_files_count += 1;
+                file_entries.remove(idx);
+            } else {
+                warn!(
+                    "Failed to remove log file {entry_path:?}",
+                    entry_path = entry.0.path()
+                );
+            }
+        }
+
+        let total_size = file_entries
+            .iter()
+            .map(|(_, _, _, size)| IggyByteSize::new(*size))
+            .sum::<IggyByteSize>();
+
+        let notification = |path: &PathBuf, count: &i32| {
+            if count > &0 {
+                info!("Logs cleaned up for directory: {path:?}. Removed 
{count} files.");
+            }
+        };
+
+        // Setting total max log size to 0 disables only total size
+        // rotation,  with other limits remain effective, including
+        // per-file size limitation,  preserving structural order.
+        if max_total_size == 0 {
+            notification(logs_path, &removed_files_count);
+            return;
+        }
+
+        if total_size > max_total_size {
+            file_entries.sort_unstable_by_key(|(_, mtime, _, _)| *mtime);
+
+            let mut remaining_size = total_size;
+            let mut to_remove = Vec::new();
+
+            for (idx, (_entry, _, _, fsize)) in 
file_entries.iter().enumerate() {
+                if remaining_size <= max_total_size {
+                    break;
+                }
+                to_remove.push((idx, *fsize));
+                remaining_size = 
remaining_size.saturating_sub(&IggyByteSize::from(*fsize));
+            }
+
+            for (idx, fsize) in to_remove.iter().rev() {
+                let entry = &file_entries[*idx];
+                if fs::remove_file(entry.0.path()).is_ok() {
+                    debug!(
+                        "Removed log file (size control): {:?} freed {:.2} 
MiB",
+                        entry.0.path(),
+                        *fsize as f64 / 1_048_576.0
+                    );
+                    removed_files_count += 1;
+                    file_entries.remove(*idx);
+                } else {
+                    warn!(
+                        "Failed to remove log file for size control: {:?}",
+                        entry.0.path()
+                    );
+                }
+            }
+        }
+
+        notification(logs_path, &removed_files_count);
+    }
 }
 
 impl Default for Logging {
@@ -425,6 +752,29 @@ impl Default for Logging {
     }
 }
 
+impl Drop for Logging {
+    fn drop(&mut self) {
+        self.rotation_should_stop
+            .store(true, std::sync::atomic::Ordering::Relaxed);
+        debug!("Set rotation_should_stop to true for log rotation thread");
+
+        if let Ok(sender_guard) = self.rotation_stop_sender.lock()
+            && let Some(ref sender) = *sender_guard
+        {
+            let _ = sender.send(()).map_err(|e| {
+                warn!("Failed to send stop signal to log rotation thread: 
{e}");
+            });
+        }
+
+        if let Some(handle) = self.rotation_thread.take() {
+            match handle.join() {
+                Ok(_) => debug!("Log rotation thread joined successfully"),
+                Err(e) => warn!("Failed to join log rotation thread: {e:?}"),
+            }
+        }
+    }
+}
+
 // This is a workaround for a bug with `with_ansi` setting in tracing
 // Bug thread: https://github.com/tokio-rs/tracing/issues/3116
 struct NoAnsiFields {}
@@ -440,3 +790,88 @@ impl<'writer> FormatFields<'writer> for NoAnsiFields {
         a.finish()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use tempfile::TempDir;
+
+    #[test]
+    fn test_log_directory_creation() {
+        let temp_dir = TempDir::new().expect("Failed to create temporary 
directory");
+        let base_path = temp_dir.path().to_str().unwrap().to_string();
+        let log_subdir = "test_logs".to_string();
+
+        let log_path = PathBuf::from(&base_path).join(&log_subdir);
+        assert!(!log_path.exists());
+        fs::create_dir_all(&log_path).expect("Failed to create log directory");
+        assert!(log_path.exists());
+    }
+
+    #[test]
+    fn test_disk_space_check() {
+        let temp_dir = TempDir::new().expect("Failed to create temporary 
directory");
+        let log_path = temp_dir.path();
+        let result = fs2::available_space(log_path);
+        assert!(result.is_ok());
+
+        let available_space = result.unwrap();
+        assert!(available_space > 0);
+    }
+
+    #[test]
+    fn test_calculate_max_files() {
+        assert_eq!(
+            Logging::calculate_max_files(IggyByteSize::from(100), 
IggyByteSize::from(0)),
+            1 // Enable unlimited size of single log, the value won't be used 
actually
+        );
+        assert_eq!(
+            Logging::calculate_max_files(IggyByteSize::from(0), 
IggyByteSize::from(100)),
+            ONE_HUNDRED_THOUSAND as usize // Allow an unlimited number of 
archived logs
+        );
+        assert_eq!(
+            Logging::calculate_max_files(
+                IggyByteSize::from(ONE_HUNDRED_THOUSAND * 10),
+                IggyByteSize::from(1)
+            ),
+            ONE_HUNDRED_THOUSAND as usize // Result should be limited to 
ONE_HUNDRED_THOUSAND by clamp
+        );
+        assert_eq!(
+            Logging::calculate_max_files(IggyByteSize::from(1000), 
IggyByteSize::from(100)),
+            10
+        );
+        assert_eq!(
+            Logging::calculate_max_files(IggyByteSize::from(500), 
IggyByteSize::from(100)),
+            5
+        );
+        assert_eq!(
+            Logging::calculate_max_files(IggyByteSize::from(2000), 
IggyByteSize::from(100)),
+            20
+        );
+        assert_eq!(
+            Logging::calculate_max_files(IggyByteSize::from(50), 
IggyByteSize::from(100)),
+            1
+        );
+    }
+
+    #[test]
+    fn test_cleanup_log_files_functions() {
+        use std::time::Duration;
+        let temp_dir = TempDir::new().expect("Failed to create temporary 
directory");
+        let log_path = temp_dir.path().to_path_buf();
+        Logging::cleanup_log_files(
+            &log_path,
+            IggyDuration::new(Duration::from_secs(3600)),
+            IggyByteSize::from(2048 * 1024),
+            IggyByteSize::from(512 * 1024),
+        );
+    }
+
+    #[test]
+    fn test_logging_creation() {
+        let logging = Logging::new();
+        assert!(logging.stdout_guard.is_none());
+        assert!(logging.file_guard.is_none());
+        assert!(logging.env_filter_reload_handle.is_none());
+    }
+}
diff --git a/foreign/cpp/tests/e2e/server.toml 
b/foreign/cpp/tests/e2e/server.toml
index de67367aa..fb5f87b35 100644
--- a/foreign/cpp/tests/e2e/server.toml
+++ b/foreign/cpp/tests/e2e/server.toml
@@ -270,10 +270,28 @@ path = "logs"
 # Level of logging detail. Options: "debug", "info", "warn", "error".
 level = "info"
 
-# Maximum size of the log files before rotation.
-max_size = "512 MB"
-
-# Time to retain log files before deletion.
+# Whether to write logs to file. When false, logs are only written to stdout.
+# When enabled, logs are stored in {system.path}/{system.logging.path} 
(default: local_data/logs).
+file_enabled = true
+
+# Maximum size of a single log file before rotation occurs. When a log
+# file reaches this size,  it will be rotated   (closed and a new file
+# created). This setting works together with max_total_size to control
+# log storage.  You can set it to 0 to enable unlimited size of single
+# log,  but all logs will be written to a single file,  thus disabling
+# log rotation. Please configure 0 with caution, esp. RUST_LOG > debug
+max_file_size = "500 MB"
+
+# Maximum total size of all log files.  When this size is reached,
+# the oldest log files will be deleted first. Set it to 0 to allow
+# an unlimited number of archived logs. This does not disable time
+# based log rotation or per-log-file size limits.
+max_total_size = "4 GB"
+
+# Time interval for checking log rotation status. Avoid less than 1s.
+rotation_check_interval = "1 h"
+
+# Time to retain log files before deletion. Avoid less than 1s, too.
 retention = "7 days"
 
 # Interval for printing system information to the log.

Reply via email to