This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 873b63965 feat(server): implement log rotation based on size and
retention (#2452)
873b63965 is described below
commit 873b63965998e07ed7b50994286d1397b6359dd5
Author: Svecco <[email protected]>
AuthorDate: Mon Feb 2 17:43:48 2026 +0800
feat(server): implement log rotation based on size and retention (#2452)
- implemented log rotation based on size and retention as the title;
- implemented configurable attributes and imported breaking changes;
- added units and integration test in logger.rs and integration mod;
- added documentations and imported new dependencies, etc.
---
Cargo.lock | 21 +
DEPENDENCIES.md | 2 +
core/common/src/utils/byte_size.rs | 7 +
core/common/src/utils/duration.rs | 29 ++
core/integration/src/test_server.rs | 4 +-
.../server/scenarios/log_rotation_scenario.rs | 382 +++++++++++++++++
core/integration/tests/server/scenarios/mod.rs | 1 +
core/integration/tests/server/specific.rs | 5 +-
core/server/Cargo.toml | 2 +
core/server/config.toml | 22 +-
core/server/src/configs/defaults.rs | 9 +-
core/server/src/configs/displays.rs | 8 +-
core/server/src/configs/system.rs | 7 +-
core/server/src/configs/validators.rs | 47 +-
core/server/src/log/logger.rs | 475 ++++++++++++++++++++-
foreign/cpp/tests/e2e/server.toml | 26 +-
16 files changed, 1009 insertions(+), 38 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index b43d62fe0..f2806dd02 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3302,6 +3302,16 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "fs2"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
+dependencies = [
+ "libc",
+ "winapi",
+]
+
[[package]]
name = "fs_extra"
version = "1.3.0"
@@ -7642,6 +7652,15 @@ dependencies = [
"byteorder",
]
+[[package]]
+name = "rolling-file"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8395b4f860856b740f20a296ea2cd4d823e81a2658cf05ef61be22916026a906"
+dependencies = [
+ "chrono",
+]
+
[[package]]
name = "route-recognizer"
version = "0.3.1"
@@ -8273,6 +8292,7 @@ dependencies = [
"figlet-rs",
"figment",
"flume 0.12.0",
+ "fs2",
"futures",
"hash32 1.0.0",
"human-repr",
@@ -8297,6 +8317,7 @@ dependencies = [
"reqwest",
"ringbuffer",
"rmp-serde",
+ "rolling-file",
"rust-embed",
"rustls",
"rustls-pemfile",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index a7fd8c256..924cbd10d 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -293,6 +293,7 @@ foreign-types-shared: 0.1.1, "Apache-2.0 OR MIT",
form_urlencoded: 1.2.2, "Apache-2.0 OR MIT",
fragile: 2.0.1, "Apache-2.0",
fs-err: 3.2.2, "Apache-2.0 OR MIT",
+fs2: 0.4.3, "Apache-2.0 OR MIT",
fs_extra: 1.3.0, "MIT",
fsevent-sys: 4.1.0, "MIT",
funty: 2.0.0, "MIT",
@@ -661,6 +662,7 @@ rmcp-macros: 0.14.0, "Apache-2.0",
rmp: 0.8.15, "MIT",
rmp-serde: 1.3.1, "MIT",
roaring: 0.11.3, "Apache-2.0 OR MIT",
+rolling-file: 0.2.0, "Apache-2.0 OR MIT",
route-recognizer: 0.3.1, "MIT",
rsa: 0.9.10, "Apache-2.0 OR MIT",
rust-embed: 8.11.0, "MIT",
diff --git a/core/common/src/utils/byte_size.rs
b/core/common/src/utils/byte_size.rs
index 2ccf58bee..230f8b484 100644
--- a/core/common/src/utils/byte_size.rs
+++ b/core/common/src/utils/byte_size.rs
@@ -137,6 +137,13 @@ impl IggyByteSize {
format!("{:.2}", self.0.get_appropriate_unit(UnitType::Decimal))
}
+ /// Subtract another IggyByteSize value, return 0 if the result is
negative.
+ pub fn saturating_sub(&self, other: &Self) -> Self {
+ let self_bytes = self.as_bytes_u64();
+ let other_bytes = other.as_bytes_u64();
+ IggyByteSize::new(self_bytes.saturating_sub(other_bytes))
+ }
+
/// Calculates the throughput based on the provided duration and returns a
human-readable string.
pub(crate) fn _as_human_throughput_string(&self, duration: &IggyDuration)
-> String {
if duration.is_zero() {
diff --git a/core/common/src/utils/duration.rs
b/core/common/src/utils/duration.rs
index cb256808a..aa6c796a7 100644
--- a/core/common/src/utils/duration.rs
+++ b/core/common/src/utils/duration.rs
@@ -29,6 +29,35 @@ use std::{
pub const SEC_IN_MICRO: u64 = 1_000_000;
+/// A struct for representing time durations with various utility functions.
+///
+/// This struct wraps `std::time::Duration` and uses the `humantime` crate for
parsing and formatting
+/// human-readable duration strings. It also implements serialization and
deserialization via the `serde` crate.
+///
+/// # Example
+///
+/// ```
+/// use iggy_common::IggyDuration;
+/// use std::str::FromStr;
+///
+/// let duration = IggyDuration::from(3661_000_000_u64); // 3661 seconds in
microseconds
+/// assert_eq!(3661, duration.as_secs());
+/// assert_eq!("1h 1m 1s", duration.as_human_time_string());
+/// assert_eq!("1h 1m 1s", format!("{}", duration));
+///
+/// let duration = IggyDuration::from(0_u64);
+/// assert_eq!(0, duration.as_secs());
+/// assert_eq!("0s", duration.as_human_time_string());
+/// assert_eq!("0s", format!("{}", duration));
+///
+/// let duration = IggyDuration::from_str("1h 1m 1s").unwrap();
+/// assert_eq!(3661, duration.as_secs());
+/// assert_eq!("1h 1m 1s", duration.as_human_time_string());
+/// assert_eq!("1h 1m 1s", format!("{}", duration));
+///
+/// let duration = IggyDuration::from_str("unlimited").unwrap();
+/// assert_eq!(0, duration.as_secs());
+/// ```
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct IggyDuration {
duration: Duration,
diff --git a/core/integration/src/test_server.rs
b/core/integration/src/test_server.rs
index a58ad85a3..81c68b619 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -120,9 +120,9 @@ impl TestServer {
Ok(parallelism) => {
let available_cpus = parallelism.get();
if available_cpus >= 4 {
- let mut rng = rand::thread_rng();
+ let mut rng = rand::rng();
let max_start = available_cpus - 4;
- let start = rng.gen_range(0..=max_start);
+ let start = rng.random_range(0..=max_start);
let end = start + 4;
format!("{}..{}", start, end)
} else {
diff --git a/core/integration/tests/server/scenarios/log_rotation_scenario.rs
b/core/integration/tests/server/scenarios/log_rotation_scenario.rs
new file mode 100644
index 000000000..69ccccd41
--- /dev/null
+++ b/core/integration/tests/server/scenarios/log_rotation_scenario.rs
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{PARTITIONS_COUNT, STREAM_NAME, TOPIC_NAME};
+use iggy::prelude::*;
+use iggy_common::{
+ CompressionAlgorithm, Identifier, IggyByteSize, IggyDuration, IggyExpiry,
MaxTopicSize,
+};
+use integration::tcp_client::TcpClientFactory;
+use integration::test_server::{ClientFactory, IpAddrKind, TestServer,
login_root};
+use once_cell::sync::Lazy;
+use serial_test::parallel;
+use std::collections::HashMap;
+use std::path::Path;
+use std::time::Duration;
+use test_case::test_matrix;
+use tokio::fs;
+use tokio::sync::Mutex;
+use tokio::time::{sleep, timeout};
+
+const RETENTION_SECS: u64 = 30;
+const OPERATION_TIMEOUT_SECS: u64 = 10;
+const OPERATION_LOOP_COUNT: usize = 300;
+const FROM_BYTES_TO_KB: u64 = 1000;
+const IGGY_LOG_BASE_NAME: &str = "iggy-server.log";
+
+static PRINT_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
+
+#[derive(Debug)]
+pub struct LogRotationTestConfig {
+ pub name: String,
+ pub max_single_log_size: IggyByteSize,
+ pub max_total_log_size: IggyByteSize,
+ pub rotation_check_interval: IggyDuration,
+ pub retention: IggyDuration,
+}
+
+fn config_regular_rotation() -> LogRotationTestConfig {
+ LogRotationTestConfig {
+ name: "log_regular_rotation".to_string(),
+ max_single_log_size: IggyByteSize::new(100_000),
+ max_total_log_size: IggyByteSize::new(400_000),
+ rotation_check_interval: IggyDuration::ONE_SECOND,
+ retention: IggyDuration::new_from_secs(RETENTION_SECS),
+ }
+}
+
+fn config_unlimited_size() -> LogRotationTestConfig {
+ LogRotationTestConfig {
+ name: "log_unlimited_size".to_string(),
+ max_single_log_size: IggyByteSize::new(0),
+ max_total_log_size: IggyByteSize::new(400_000),
+ rotation_check_interval: IggyDuration::ONE_SECOND,
+ retention: IggyDuration::new_from_secs(RETENTION_SECS),
+ }
+}
+
+fn config_unlimited_archives() -> LogRotationTestConfig {
+ LogRotationTestConfig {
+ name: "log_unlimited_archives".to_string(),
+ max_single_log_size: IggyByteSize::new(100_000),
+ max_total_log_size: IggyByteSize::new(0),
+ rotation_check_interval: IggyDuration::ONE_SECOND,
+ retention: IggyDuration::new_from_secs(RETENTION_SECS),
+ }
+}
+
+fn config_special_scenario() -> LogRotationTestConfig {
+ LogRotationTestConfig {
+ name: "log_special_scenario".to_string(),
+ max_single_log_size: IggyByteSize::new(0),
+ max_total_log_size: IggyByteSize::new(0),
+ rotation_check_interval: IggyDuration::ONE_SECOND,
+ retention: IggyDuration::new_from_secs(RETENTION_SECS),
+ }
+}
+
+#[test_matrix(
+ [config_regular_rotation(), config_unlimited_size(),
config_unlimited_archives(), config_special_scenario()]
+)]
+#[tokio::test]
+#[parallel]
+async fn log_rotation_should_be_valid(present_log_config:
LogRotationTestConfig) {
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert(
+ "IGGY_SYSTEM_LOGGING_MAX_FILE_SIZE".to_string(),
+ format!("{}", present_log_config.max_single_log_size),
+ );
+ extra_envs.insert(
+ "IGGY_SYSTEM_LOGGING_MAX_TOTAL_SIZE".to_string(),
+ format!("{}", present_log_config.max_total_log_size),
+ );
+ extra_envs.insert(
+ "IGGY_SYSTEM_LOGGING_ROTATION_CHECK_INTERVAL".to_string(),
+ format!("{}", present_log_config.rotation_check_interval),
+ );
+ extra_envs.insert(
+ "IGGY_SYSTEM_LOGGING_RETENTION".to_string(),
+ format!("{}", present_log_config.retention),
+ );
+
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+ let client_factory = TcpClientFactory {
+ server_addr,
+ ..Default::default()
+ };
+
+ let log_dir = format!("{}/logs", test_server.get_local_data_path());
+
+ test_server.assert_running();
+ run(&client_factory, &log_dir, present_log_config).await;
+}
+
+async fn run(
+ client_factory: &dyn ClientFactory,
+ log_dir: &str,
+ present_log_config: LogRotationTestConfig,
+) {
+ let done_status = false;
+ let present_log_test_title = present_log_config.name.clone();
+ let log_path = Path::new(log_dir);
+ assert!(
+ log_path.exists() && log_path.is_dir(),
+ "failed::no_such_directory => {log_dir}",
+ );
+
+ let client = init_valid_client(client_factory).await;
+ assert!(
+ client.is_ok(),
+ "failed::client_initialize => {:?}",
+ client.as_ref().err(),
+ );
+
+ let generator_result =
generate_enough_logs(client.as_ref().unwrap()).await;
+ assert!(
+ generator_result.is_ok(),
+ "failed::generate_logs => {:?}",
+ generator_result.as_ref().err(),
+ );
+
+ nocapture_observer(log_path, &present_log_test_title, done_status).await;
+ sleep(present_log_config.rotation_check_interval.get_duration()).await;
+
+ let rotation_result = validate_log_rotation_rules(log_path,
present_log_config).await;
+ assert!(
+ rotation_result.is_ok(),
+ "failed::rotation_check => {:?}",
+ rotation_result.as_ref().err(),
+ );
+
+ nocapture_observer(log_path, &present_log_test_title, !done_status).await;
+}
+
+async fn init_valid_client(client_factory: &dyn ClientFactory) ->
Result<IggyClient, String> {
+ let operation_timeout =
IggyDuration::new(Duration::from_secs(OPERATION_TIMEOUT_SECS));
+ let client_wrapper = timeout(
+ operation_timeout.get_duration(),
+ client_factory.create_client(),
+ )
+ .await
+ .map_err(|_| "ClientWrapper creation timed out")?;
+
+ timeout(operation_timeout.get_duration(), client_wrapper.connect())
+ .await
+ .map_err(|_| "Client connection timed out")?
+ .map_err(|e| format!("Client connection failed: {e:?}"))?;
+
+ let client = IggyClient::create(client_wrapper, None, None);
+ timeout(operation_timeout.get_duration(), login_root(&client))
+ .await
+ .map_err(|e| format!("Root user login timed out: {e:?}"))?;
+
+ Ok(client)
+}
+
+/// Loop through the creation and deletion of streams / topics
+/// to trigger server business operations, thereby generating
+/// sufficient log data to meet the trigger conditions for the
+/// log rotation test.
+async fn generate_enough_logs(client: &IggyClient) -> Result<(), String> {
+ for i in 0..OPERATION_LOOP_COUNT {
+ let stream_name = format!("{STREAM_NAME}-{i}");
+ let topic_name = format!("{TOPIC_NAME}-{i}");
+
+ client
+ .create_stream(&stream_name)
+ .await
+ .map_err(|e| format!("Failed to create {stream_name}: {e}"))?;
+
+ let stream_identifier = Identifier::named(&stream_name)
+ .map_err(|e| format!("Failed to create stream label {e}"))?;
+
+ client
+ .create_topic(
+ &stream_identifier,
+ &topic_name,
+ PARTITIONS_COUNT,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::Unlimited,
+ )
+ .await
+ .map_err(|e| format!("Failed to create topic {topic_name}: {e}"))?;
+
+ client
+ .delete_stream(&stream_identifier)
+ .await
+ .map_err(|e| format!("Failed to remove stream {stream_name}:
{e}"))?;
+ }
+
+ Ok(())
+}
+
+async fn validate_log_rotation_rules(
+ log_dir: &Path,
+ present_log_config: LogRotationTestConfig,
+) -> Result<(), String> {
+ let log_dir_display = log_dir.display();
+ let mut dir_entries = fs::read_dir(log_dir)
+ .await
+ .map_err(|e| format!("Failed to read log directory
'{log_dir_display}': {e}",))?;
+
+ let mut valid_log_files = Vec::new();
+ while let Some(entry) = dir_entries.next_entry().await.map_err(|e| {
+ format!("Failed to read next entry in log directory
'{log_dir_display}': {e}",)
+ })? {
+ let file_path = entry.path();
+
+ if !file_path.is_file() {
+ continue;
+ }
+
+ let file_name = match file_path.file_name().and_then(|name|
name.to_str()) {
+ Some(name) => name,
+ None => continue,
+ };
+
+ if is_valid_iggy_log_file(file_name) {
+ valid_log_files.push(file_path);
+ }
+ }
+
+ if valid_log_files.is_empty() {
+ return Err(format!(
+ "No valid Iggy log files found in directory '{}'. Expected files
matching '{}' (original) or '{}.<numeric>' (archived).",
+ log_dir_display, IGGY_LOG_BASE_NAME, IGGY_LOG_BASE_NAME
+ ));
+ }
+
+ // logger.rs => tracing_appender::non_blocking(file_appender);
+ // The delay in log writing in Iggy mainly depends on the processing speed
+ // of background threads and the operating system's I/O scheduling, which
+ // means that the actual size of written logs may be slightly larger than
+ // expected. So there ignores tiny minor overflow by comparing integer KB
+ // values instead of exact bytes.
+
+ let mut total_log_size = IggyByteSize::new(0);
+ let max_single_kb = present_log_config.max_single_log_size.as_bytes_u64()
/ FROM_BYTES_TO_KB;
+ let max_total_kb = present_log_config.max_total_log_size.as_bytes_u64() /
FROM_BYTES_TO_KB;
+ let present_file_amount = valid_log_files.len();
+
+ if max_single_kb == 0 && present_file_amount > 1 {
+ return Err(format!(
+ "Log size should be unlimited if `max_file_size` is set to 0,
found {} files.",
+ present_file_amount,
+ ));
+ } else if max_total_kb == 0 && max_single_kb != 0 {
+ if present_file_amount as u64 <= 1 {
+ return Err(format!(
+ "Archives should be unlimited if `max_total_size` is set to 0,
found {} files.",
+ present_file_amount,
+ ));
+ }
+ } else {
+ for log_file in valid_log_files {
+ let file_metadata = fs::metadata(&log_file).await.map_err(|e| {
+ format!(
+ "Failed to get metadata for file '{}': {}",
+ log_file.display(),
+ e
+ )
+ })?;
+
+ let file_size_bytes = file_metadata.len();
+ if max_single_kb != 0 {
+ let current_single_kb = file_size_bytes / FROM_BYTES_TO_KB;
+ if current_single_kb > max_single_kb {
+ return Err(format!(
+ "Single log file exceeds maximum allowed size: '{}'",
+ log_file.display()
+ ));
+ }
+ }
+
+ total_log_size += IggyByteSize::new(file_size_bytes);
+ }
+ }
+
+ let current_total_kb = total_log_size.as_bytes_u64() / FROM_BYTES_TO_KB;
+ if max_total_kb != 0 && max_single_kb != 0 && current_total_kb >
max_total_kb {
+ return Err(format!(
+ "Total log size exceeds maximum:{} expected: '{}'KB",
+ log_dir_display, max_total_kb,
+ ));
+ } else if max_total_kb != 0
+ && max_single_kb != 0
+ && present_file_amount as u64 > max_total_kb / max_single_kb
+ {
+ return Err(format!(
+ "Total log file amount exceeds:{} expected: '{}'",
+ log_dir_display,
+ max_total_kb / max_single_kb,
+ ));
+ }
+
+ Ok(())
+}
+
+fn is_valid_iggy_log_file(file_name: &str) -> bool {
+ if file_name == IGGY_LOG_BASE_NAME {
+ return true;
+ }
+
+ let archive_log_prefix = format!("{}.", IGGY_LOG_BASE_NAME);
+ if file_name.starts_with(&archive_log_prefix) {
+ let numeric_suffix = &file_name[archive_log_prefix.len()..];
+ return !numeric_suffix.is_empty() && numeric_suffix.chars().all(|c|
c.is_ascii_digit());
+ }
+ false
+}
+
+/// Solely for manual && direct observation of file status to
+/// reduce debugging overhead. Due to the different nature of
+/// asynchronous tasks, the output order of scenarios may be
+/// mixed, but the mutex can prevent messy terminal output.
+async fn nocapture_observer(log_path: &Path, title: &str, done: bool) -> () {
+ let _lock = PRINT_LOCK.lock().await;
+ eprintln!(
+ "\n{:>4}\x1b[33m Size\x1b[0m <-> \x1b[33mPath\x1b[0m &&
server::specific::log_rotation_should_be_valid::\x1b[33m{}\x1b[0m",
+ "", title,
+ );
+
+ let mut dir_entries = fs::read_dir(log_path).await.unwrap();
+ while let Some(entry) = dir_entries.next_entry().await.unwrap() {
+ let file_path = entry.path();
+ if file_path.is_file() {
+ let meta = fs::metadata(&file_path).await.unwrap();
+ eprintln!(
+ "{:>6} KB <-> {:<50}",
+ meta.len() / FROM_BYTES_TO_KB,
+ file_path.display()
+ );
+ }
+ }
+
+ if done {
+ eprintln!(
+ "\n\x1b[32m [Passed]\x1b[0m <-> {:<25} <{:->45}>\n",
+ title, "",
+ );
+ }
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index 3253493c4..b04866f37 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -29,6 +29,7 @@ pub mod create_message_payload;
pub mod cross_protocol_pat_scenario;
pub mod delete_segments_scenario;
pub mod encryption_scenario;
+pub mod log_rotation_scenario;
pub mod message_headers_scenario;
pub mod message_size_scenario;
pub mod offset_scenario;
diff --git a/core/integration/tests/server/specific.rs
b/core/integration/tests/server/specific.rs
index 3e6849488..afd4a3a0c 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -134,7 +135,7 @@ async fn tcp_tls_self_signed_scenario_should_be_valid() {
.await
.expect("Failed to connect TLS client with self-signed cert");
- let client =
iggy::clients::client::IggyClient::create(ClientWrapper::Iggy(client), None,
None);
+ let client = IggyClient::create(ClientWrapper::Iggy(client), None, None);
tcp_tls_scenario::run(&client).await;
}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 6789e6cb0..7f84a50b6 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -66,6 +66,7 @@ error_set = { workspace = true }
figlet-rs = { workspace = true }
figment = { workspace = true }
flume = { workspace = true }
+fs2 = "0.4.3"
futures = { workspace = true }
hash32 = { workspace = true }
human-repr = { workspace = true }
@@ -89,6 +90,7 @@ rand = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls-no-provider"] }
ringbuffer = { workspace = true }
rmp-serde = { workspace = true }
+rolling-file = "0.2.0"
rust-embed = { workspace = true, optional = true }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
diff --git a/core/server/config.toml b/core/server/config.toml
index cec71ddc7..3949b9cd5 100644
--- a/core/server/config.toml
+++ b/core/server/config.toml
@@ -363,10 +363,24 @@ level = "info"
# When enabled, logs are stored in {system.path}/{system.logging.path}
(default: local_data/logs).
file_enabled = true
-# Maximum size of the log files before rotation.
-max_size = "512 MB"
-
-# Time to retain log files before deletion.
+# Maximum size of a single log file before rotation occurs. When a log
+# file reaches this size, it will be rotated (closed and a new file
+# created). This setting works together with max_total_size to control
+# log storage. You can set it to 0 to enable unlimited size of single
+# log, but all logs will be written to a single file, thus disabling
+# log rotation. Please configure 0 with caution, esp. RUST_LOG > debug
+max_file_size = "500 MB"
+
+# Maximum total size of all log files. When this size is reached,
+# the oldest log files will be deleted first. Set it to 0 to allow
+# an unlimited number of archived logs. This does not disable time
+# based log rotation or per-log-file size limits.
+max_total_size = "4 GB"
+
+# Time interval for checking log rotation status. Avoid less than 1s.
+rotation_check_interval = "1 h"
+
+# Time to retain log files before deletion. Avoid less than 1s, too.
retention = "7 days"
# Interval for printing system information to the log.
diff --git a/core/server/src/configs/defaults.rs
b/core/server/src/configs/defaults.rs
index af283442e..ce9a84ab4 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -402,7 +402,14 @@ impl Default for LoggingConfig {
path: SERVER_CONFIG.system.logging.path.parse().unwrap(),
level: SERVER_CONFIG.system.logging.level.parse().unwrap(),
file_enabled: SERVER_CONFIG.system.logging.file_enabled,
- max_size: SERVER_CONFIG.system.logging.max_size.parse().unwrap(),
+ max_file_size:
SERVER_CONFIG.system.logging.max_file_size.parse().unwrap(),
+ max_total_size:
SERVER_CONFIG.system.logging.max_total_size.parse().unwrap(),
+ rotation_check_interval: SERVER_CONFIG
+ .system
+ .logging
+ .rotation_check_interval
+ .parse()
+ .unwrap(),
retention: SERVER_CONFIG.system.logging.retention.parse().unwrap(),
sysinfo_print_interval: SERVER_CONFIG
.system
diff --git a/core/server/src/configs/displays.rs
b/core/server/src/configs/displays.rs
index 8973b356a..515011c25 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/server/src/configs/displays.rs
@@ -170,7 +170,7 @@ impl Display for ServerConfig {
}
impl Display for MessageSaverConfig {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{{ enabled: {}, enforce_fsync: {}, interval: {} }}",
@@ -249,11 +249,13 @@ impl Display for LoggingConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ path: {}, level: {}, file_enabled: {}, max_size: {},
retention: {} }}",
+ "{{ path: {}, level: {}, file_enabled: {}, max_file_size: {},
max_total_size: {}, rotation_check_interval: {}, retention: {} }}",
self.path,
self.level,
self.file_enabled,
- self.max_size.as_human_string_with_zero_as_unlimited(),
+ self.max_file_size.as_human_string_with_zero_as_unlimited(),
+ self.max_total_size.as_human_string_with_zero_as_unlimited(),
+ self.rotation_check_interval,
self.retention
)
}
diff --git a/core/server/src/configs/system.rs
b/core/server/src/configs/system.rs
index f8c7423ed..8e1205967 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -86,7 +86,12 @@ pub struct LoggingConfig {
pub level: String,
pub file_enabled: bool,
#[config_env(leaf)]
- pub max_size: IggyByteSize,
+ pub max_file_size: IggyByteSize,
+ #[config_env(leaf)]
+ pub max_total_size: IggyByteSize,
+ #[config_env(leaf)]
+ #[serde_as(as = "DisplayFromStr")]
+ pub rotation_check_interval: IggyDuration,
#[config_env(leaf)]
#[serde_as(as = "DisplayFromStr")]
pub retention: IggyDuration,
diff --git a/core/server/src/configs/validators.rs
b/core/server/src/configs/validators.rs
index 987b0d86a..ceb0fe30d 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -22,7 +22,7 @@ use super::server::{
DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig,
TelemetryConfig,
};
use super::sharding::{CpuAllocation, ShardingConfig};
-use super::system::{CompressionConfig, PartitionConfig};
+use super::system::{CompressionConfig, LoggingConfig, PartitionConfig};
use crate::configs::COMPONENT;
use crate::configs::server::{MemoryPoolConfig, PersonalAccessTokenConfig,
ServerConfig};
use crate::configs::sharding::NumaTopology;
@@ -82,6 +82,13 @@ impl Validatable<ConfigurationError> for ServerConfig {
format!("{COMPONENT} (error: {e}) - failed to validate cluster
config")
})?;
+ self.system
+ .logging
+ .validate()
+ .error(|e: &ConfigurationError| {
+ format!("{COMPONENT} (error: {e}) - failed to validate logging
config")
+ })?;
+
let topic_size = match self.system.topic.max_size {
MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()),
MaxTopicSize::Unlimited => Ok(u64::MAX),
@@ -250,6 +257,44 @@ impl Validatable<ConfigurationError> for
PersonalAccessTokenConfig {
}
}
+impl Validatable<ConfigurationError> for LoggingConfig {
+ fn validate(&self) -> Result<(), ConfigurationError> {
+ if self.level.is_empty() {
+ error!("system.logging.level is supposed be configured");
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+
+ if self.retention.as_secs() < 1 {
+ error!(
+ "Configured system.logging.retention {} is less than minimum 1
second",
+ self.retention
+ );
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+
+ if self.rotation_check_interval.as_secs() < 1 {
+ error!(
+ "Configured system.logging.rotation_check_interval {} is less
than minimum 1 second",
+ self.rotation_check_interval
+ );
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+
+ let max_total_size_unlimited = self.max_total_size.as_bytes_u64() == 0;
+ if !max_total_size_unlimited
+ && self.max_file_size.as_bytes_u64() >
self.max_total_size.as_bytes_u64()
+ {
+ error!(
+ "Configured system.logging.max_total_size {} is less than
system.logging.max_file_size {}",
+ self.max_total_size, self.max_file_size
+ );
+ return Err(ConfigurationError::InvalidConfigurationValue);
+ }
+
+ Ok(())
+ }
+}
+
impl Validatable<ConfigurationError> for MemoryPoolConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
if self.enabled && self.size == 0 {
diff --git a/core/server/src/log/logger.rs b/core/server/src/log/logger.rs
index 4c0e0691e..b9b4d2aa6 100644
--- a/core/server/src/log/logger.rs
+++ b/core/server/src/log/logger.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -21,6 +22,7 @@ use crate::configs::server::{TelemetryConfig,
TelemetryTransport};
use crate::configs::system::LoggingConfig;
use crate::log::runtime::CompioRuntime;
use crate::server_error::LogError;
+use iggy_common::{IggyByteSize, IggyDuration};
use opentelemetry::KeyValue;
use opentelemetry::global;
use opentelemetry::trace::TracerProvider;
@@ -30,10 +32,14 @@ use opentelemetry_sdk::Resource;
use opentelemetry_sdk::logs::log_processor_with_async_runtime;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::span_processor_with_async_runtime;
+use rolling_file::{BasicRollingFileAppender, RollingConditionBasic};
+use std::fs;
use std::io::{self, Write};
use std::path::PathBuf;
+use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
-use tracing::{info, trace};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tracing::{debug, error, info, trace, warn};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::field::{RecordFields, VisitOutput};
@@ -47,15 +53,16 @@ use tracing_subscriber::{
};
const IGGY_LOG_FILE_PREFIX: &str = "iggy-server.log";
+const ONE_HUNDRED_THOUSAND: u64 = 100_000;
// Writer that does nothing
struct NullWriter;
impl Write for NullWriter {
- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
Ok(buf.len())
}
- fn flush(&mut self) -> std::io::Result<()> {
+ fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
@@ -63,13 +70,13 @@ impl Write for NullWriter {
// Wrapper around Arc<Mutex<Vec<String>>> to implement Write
struct VecStringWriter(Arc<Mutex<Vec<String>>>);
impl Write for VecStringWriter {
- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut lock = self.0.lock().unwrap();
lock.push(String::from_utf8_lossy(buf).into_owned());
Ok(buf.len())
}
- fn flush(&mut self) -> std::io::Result<()> {
+ fn flush(&mut self) -> io::Result<()> {
// Just nop, we don't need to flush anything
Ok(())
}
@@ -128,6 +135,9 @@ pub struct Logging {
otel_traces_reload_handle: Option<ReloadHandle<FilteredRegistry>>,
early_logs_buffer: Arc<Mutex<Vec<String>>>,
+ rotation_should_stop: Arc<AtomicBool>,
+ rotation_thread: Option<std::thread::JoinHandle<()>>,
+ rotation_stop_sender: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
}
impl Logging {
@@ -140,7 +150,10 @@ impl Logging {
env_filter_reload_handle: None,
otel_logs_reload_handle: None,
otel_traces_reload_handle: None,
+ rotation_thread: None,
+ rotation_stop_sender: Arc::new(Mutex::new(None)),
early_logs_buffer: Arc::new(Mutex::new(vec![])),
+ rotation_should_stop: Arc::new(AtomicBool::new(false)),
}
}
@@ -213,7 +226,7 @@ impl Logging {
// Use the rolling appender to avoid having a huge log file.
// Make sure logs are dumped to the file during graceful shutdown.
- trace!("Logging config: {}", config);
+ trace!("Logging config: {config}");
// Reload EnvFilter with config level if RUST_LOG is not set.
// Config level supports EnvFilter syntax (e.g.,
"warn,server=debug,iggy=trace").
@@ -232,7 +245,7 @@ impl Logging {
};
// Initialize non-blocking stdout layer
- let (non_blocking_stdout, stdout_guard) =
tracing_appender::non_blocking(std::io::stdout());
+ let (non_blocking_stdout, stdout_guard) =
tracing_appender::non_blocking(io::stdout());
let stdout_layer = fmt::Layer::default()
.with_ansi(true)
.event_format(Self::get_log_format())
@@ -252,9 +265,51 @@ impl Logging {
let logs_path = if config.file_enabled {
let base_directory = PathBuf::from(base_directory);
let logs_subdirectory = PathBuf::from(config.path.clone());
- let logs_path = base_directory.join(logs_subdirectory.clone());
- let file_appender =
- tracing_appender::rolling::hourly(logs_path.clone(),
IGGY_LOG_FILE_PREFIX);
+ let logs_subdirectory = logs_subdirectory
+ .canonicalize()
+ .unwrap_or(logs_subdirectory);
+ let logs_path = base_directory.join(logs_subdirectory);
+
+ if let Err(e) = fs::create_dir_all(&logs_path) {
+ warn!("Failed to create logs directory {logs_path:?}: {e}");
+ return Err(LogError::FileReloadFailure);
+ }
+
+ // Check available disk space, at least 10MB
+ let min_disk_space: u64 = ONE_HUNDRED_THOUSAND * 100;
+ if let Ok(available_space) = fs2::available_space(&logs_path) {
+ if available_space < min_disk_space {
+ warn!(
+ "Low disk space for logs. Available: {available_space}
bytes, Recommended: {min_disk_space} bytes"
+ );
+ }
+ } else {
+ warn!("Failed to check available disk space for logs
directory: {logs_path:?}");
+ }
+
+ let max_files = Self::calculate_max_files(config.max_total_size,
config.max_file_size);
+
+ // If max_file_size == 0, then keep interpreting behavior as same
+ // as fn IggyByteSize::as_human_string_with_zero_as_unlimited do.
+ // This will cover all log rotations if expecting unlimited.
+ let mut condition_builder = RollingConditionBasic::new();
+ let max_file_size_bytes = config.max_file_size.as_bytes_u64();
+
+ if max_file_size_bytes != 0 {
+ condition_builder =
condition_builder.max_size(max_file_size_bytes).hourly();
+ }
+ let condition = condition_builder;
+
+ let file_appender = BasicRollingFileAppender::new(
+ logs_path.join(IGGY_LOG_FILE_PREFIX),
+ condition,
+ max_files,
+ )
+ .map_err(|e| {
+ error!("Failed to create file appender: {e}");
+ LogError::FileReloadFailure
+ })?;
+
let (mut non_blocking_file, file_guard) =
tracing_appender::non_blocking(file_appender);
self.dump_to_file(&mut non_blocking_file);
@@ -284,9 +339,11 @@ impl Logging {
self.init_telemetry(telemetry_config)?;
}
+ self.rotation_thread = self.install_log_rotation_handler(config,
logs_path.as_ref());
+
if let Some(logs_path) = logs_path {
info!(
- "Logging initialized, logs will be stored at: {logs_path:?}.
Logs will be rotated hourly. Log filter: {log_filter}."
+ "Logging initialized, logs will be stored at: {logs_path:?}.
Logs will be rotated based on size. Log filter: {log_filter}."
);
} else {
info!("Logging initialized (file output disabled). Log filter:
{log_filter}.");
@@ -387,8 +444,8 @@ impl Logging {
.expect("Failed to modify telemetry traces layer");
info!(
- "Telemetry initialized with service name: {}",
- telemetry_config.service_name
+ "Telemetry initialized with service name: {config_service_name}",
+ config_service_name = telemetry_config.service_name
);
Ok(())
}
@@ -397,10 +454,6 @@ impl Logging {
Format::default().with_thread_names(true)
}
- fn _install_log_rotation_handler(&self) {
- todo!("Implement log rotation handler based on size and retention
time");
- }
-
fn print_build_info() {
if option_env!("IGGY_CI_BUILD") == Some("true") {
let hash = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown");
@@ -408,8 +461,7 @@ impl Logging {
let rust_version =
option_env!("VERGEN_RUSTC_SEMVER").unwrap_or("unknown");
let target =
option_env!("VERGEN_CARGO_TARGET_TRIPLE").unwrap_or("unknown");
info!(
- "Version: {VERSION}, hash: {}, built at: {} using rust
version: {} for target: {}",
- hash, built_at, rust_version, target
+ "Version: {VERSION}, hash: {hash}, built at: {built_at} using
rust version: {rust_version} for target: {target}"
);
} else {
info!(
@@ -417,6 +469,281 @@ impl Logging {
)
}
}
+
+ fn calculate_max_files(
+ max_total_size_bytes: IggyByteSize,
+ max_file_size_bytes: IggyByteSize,
+ ) -> usize {
+ if max_total_size_bytes == 0 {
+ // If the third attribute of BasicRollingFileAppender::new()
+ // is `usize::MAX` then it would reach iter capability.
+ ONE_HUNDRED_THOUSAND as usize
+ } else if max_file_size_bytes == 0 {
+ 1
+ } else {
+ let max_files =
+ max_total_size_bytes.as_bytes_u64() /
max_file_size_bytes.as_bytes_u64();
+ max_files.clamp(1, ONE_HUNDRED_THOUSAND) as usize
+ }
+ }
+
+ fn install_log_rotation_handler(
+ &self,
+ config: &LoggingConfig,
+ logs_path: Option<&PathBuf>,
+ ) -> Option<std::thread::JoinHandle<()>> {
+ let logs_path = logs_path?;
+ let path = logs_path.to_path_buf();
+ let max_total_size = config.max_total_size;
+ let max_file_size = config.max_file_size;
+ let rotation_check_interval = config.rotation_check_interval;
+ let retention = config.retention;
+ let should_stop = Arc::clone(&self.rotation_should_stop);
+
+ let (tx, rx) = std::sync::mpsc::channel::<()>();
+ *self.rotation_stop_sender.lock().unwrap() = Some(tx.clone());
+
+ let handle = std::thread::Builder::new()
+ .name("log-rotation".to_string())
+ .spawn(move || {
+ Self::run_log_rotation_loop(
+ path,
+ retention,
+ max_total_size,
+ max_file_size,
+ rotation_check_interval,
+ should_stop,
+ rx,
+ )
+ })
+ .expect("Failed to spawn log rotation thread");
+
+ Some(handle)
+ }
+
+ fn run_log_rotation_loop(
+ path: PathBuf,
+ retention: IggyDuration,
+ max_total_size: IggyByteSize,
+ max_file_size: IggyByteSize,
+ check_interval: IggyDuration,
+ should_stop: Arc<AtomicBool>,
+ rx: std::sync::mpsc::Receiver<()>,
+ ) {
+ loop {
+ if should_stop.load(std::sync::atomic::Ordering::Relaxed) {
+ debug!("Log rotation thread detected stop flag, exiting");
+ break;
+ }
+
+ match rx.recv_timeout(check_interval.get_duration()) {
+ Ok(_) => {
+ debug!("Log rotation thread received channel stop signal,
exiting");
+ break;
+ }
+ Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
+ Self::cleanup_log_files(&path, retention, max_total_size,
max_file_size);
+ }
+ Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
+ warn!("Log rotation channel disconnected, exiting thread");
+ break;
+ }
+ }
+ }
+
+ debug!("Log rotation thread exited gracefully");
+ }
+
+ fn read_log_files(logs_path: &PathBuf) -> Vec<(fs::DirEntry, SystemTime,
Duration, u64)> {
+ let entries = match fs::read_dir(logs_path) {
+ Ok(entries) => entries,
+ Err(e) => {
+ warn!("Failed to read log directory {logs_path:?}: {e}");
+ return Vec::new();
+ }
+ };
+
+ let mut file_entries = Vec::new();
+
+ for entry in entries.flatten() {
+ if let Some(file_name) = entry.file_name().to_str() {
+ if file_name == IGGY_LOG_FILE_PREFIX {
+ // Skip the actively written primary log file
+ continue;
+ }
+ if !file_name.starts_with(IGGY_LOG_FILE_PREFIX) {
+ continue;
+ }
+ } else {
+ continue;
+ }
+
+ let metadata = match entry.metadata() {
+ Ok(metadata) => metadata,
+ Err(e) => {
+ warn!(
+ "Failed to get metadata for {entry_path:?}: {e}",
+ entry_path = entry.path()
+ );
+ continue;
+ }
+ };
+
+ if !metadata.is_file() {
+ continue;
+ }
+
+ let modified = match metadata.modified() {
+ Ok(modified) => modified,
+ Err(e) => {
+ warn!(
+ "Failed to get modification time for {entry_path:?}:
{e}",
+ entry_path = entry.path()
+ );
+ continue;
+ }
+ };
+
+ let elapsed = match modified.duration_since(UNIX_EPOCH) {
+ Ok(elapsed) => elapsed,
+ Err(e) => {
+ warn!(
+ "Failed to calculate elapsed time for {entry_path:?}:
{e}",
+ entry_path = entry.path()
+ );
+ continue;
+ }
+ };
+
+ let file_size = metadata.len();
+ file_entries.push((entry, modified, elapsed, file_size));
+ }
+
+ file_entries
+ }
+
+ fn cleanup_log_files(
+ logs_path: &PathBuf,
+ retention: IggyDuration,
+ max_total_size: IggyByteSize,
+ max_file_size: IggyByteSize,
+ ) {
+ debug!("Starting log cleanup for directory: {logs_path:?}");
+ debug!(
+ "retention: {retention:?}, max_total_size: {max_total_size} bytes,
max_single_file_size: {max_file_size} bytes"
+ );
+
+ let mut file_entries = Self::read_log_files(logs_path);
+ debug!(
+ "Processed {file_entries_len} log files from directory:
{logs_path:?}",
+ file_entries_len = file_entries.len(),
+ );
+
+ let mut removed_files_count = 0;
+ let cutoff = if !retention.is_zero() {
+ match SystemTime::now().duration_since(UNIX_EPOCH) {
+ Ok(now) => Some(now - retention.get_duration()),
+ Err(e) => {
+ warn!("Failed to get current time: {e}");
+ return;
+ }
+ }
+ } else {
+ None
+ };
+
+ let mut expired_file_indices = Vec::new();
+ for (idx, tuple) in file_entries.iter().enumerate() {
+ let entry = &tuple.0;
+ let elapsed = &tuple.2;
+
+ let mut need_remove = false;
+ if let Some(cutoff) = &cutoff
+ && *elapsed < *cutoff
+ {
+ need_remove = true;
+ debug!(
+ "Mark old log file for remove: {entry_path:?}",
+ entry_path = entry.path()
+ );
+ }
+
+ if need_remove {
+ expired_file_indices.push(idx);
+ }
+ }
+
+ for &idx in expired_file_indices.iter().rev() {
+ let entry = &file_entries[idx];
+ if fs::remove_file(entry.0.path()).is_ok() {
+ debug!(
+ "Removed log file: {entry_path:?}",
+ entry_path = entry.0.path()
+ );
+ removed_files_count += 1;
+ file_entries.remove(idx);
+ } else {
+ warn!(
+ "Failed to remove log file {entry_path:?}",
+ entry_path = entry.0.path()
+ );
+ }
+ }
+
+ let total_size = file_entries
+ .iter()
+ .map(|(_, _, _, size)| IggyByteSize::new(*size))
+ .sum::<IggyByteSize>();
+
+ let notification = |path: &PathBuf, count: &i32| {
+ if count > &0 {
+ info!("Logs cleaned up for directory: {path:?}. Removed
{count} files.");
+ }
+ };
+
+ // Setting total max log size to 0 disables only total size
+ // rotation, with other limits remain effective, including
+ // per-file size limitation, preserving structural order.
+ if max_total_size == 0 {
+ notification(logs_path, &removed_files_count);
+ return;
+ }
+
+ if total_size > max_total_size {
+ file_entries.sort_unstable_by_key(|(_, mtime, _, _)| *mtime);
+
+ let mut remaining_size = total_size;
+ let mut to_remove = Vec::new();
+
+ for (idx, (_entry, _, _, fsize)) in
file_entries.iter().enumerate() {
+ if remaining_size <= max_total_size {
+ break;
+ }
+ to_remove.push((idx, *fsize));
+ remaining_size =
remaining_size.saturating_sub(&IggyByteSize::from(*fsize));
+ }
+
+ for (idx, fsize) in to_remove.iter().rev() {
+ let entry = &file_entries[*idx];
+ if fs::remove_file(entry.0.path()).is_ok() {
+ debug!(
+ "Removed log file (size control): {:?} freed {:.2}
MiB",
+ entry.0.path(),
+ *fsize as f64 / 1_048_576.0
+ );
+ removed_files_count += 1;
+ file_entries.remove(*idx);
+ } else {
+ warn!(
+ "Failed to remove log file for size control: {:?}",
+ entry.0.path()
+ );
+ }
+ }
+ }
+
+ notification(logs_path, &removed_files_count);
+ }
}
impl Default for Logging {
@@ -425,6 +752,29 @@ impl Default for Logging {
}
}
+impl Drop for Logging {
+ fn drop(&mut self) {
+ self.rotation_should_stop
+ .store(true, std::sync::atomic::Ordering::Relaxed);
+ debug!("Set rotation_should_stop to true for log rotation thread");
+
+ if let Ok(sender_guard) = self.rotation_stop_sender.lock()
+ && let Some(ref sender) = *sender_guard
+ {
+ let _ = sender.send(()).map_err(|e| {
+ warn!("Failed to send stop signal to log rotation thread:
{e}");
+ });
+ }
+
+ if let Some(handle) = self.rotation_thread.take() {
+ match handle.join() {
+ Ok(_) => debug!("Log rotation thread joined successfully"),
+ Err(e) => warn!("Failed to join log rotation thread: {e:?}"),
+ }
+ }
+ }
+}
+
// This is a workaround for a bug with `with_ansi` setting in tracing
// Bug thread: https://github.com/tokio-rs/tracing/issues/3116
struct NoAnsiFields {}
@@ -440,3 +790,88 @@ impl<'writer> FormatFields<'writer> for NoAnsiFields {
a.finish()
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use tempfile::TempDir;
+
+ #[test]
+ fn test_log_directory_creation() {
+ let temp_dir = TempDir::new().expect("Failed to create temporary
directory");
+ let base_path = temp_dir.path().to_str().unwrap().to_string();
+ let log_subdir = "test_logs".to_string();
+
+ let log_path = PathBuf::from(&base_path).join(&log_subdir);
+ assert!(!log_path.exists());
+ fs::create_dir_all(&log_path).expect("Failed to create log directory");
+ assert!(log_path.exists());
+ }
+
+ #[test]
+ fn test_disk_space_check() {
+ let temp_dir = TempDir::new().expect("Failed to create temporary
directory");
+ let log_path = temp_dir.path();
+ let result = fs2::available_space(log_path);
+ assert!(result.is_ok());
+
+ let available_space = result.unwrap();
+ assert!(available_space > 0);
+ }
+
+ #[test]
+ fn test_calculate_max_files() {
+ assert_eq!(
+ Logging::calculate_max_files(IggyByteSize::from(100),
IggyByteSize::from(0)),
+ 1 // Enable unlimited size of single log, the value won't be used
actually
+ );
+ assert_eq!(
+ Logging::calculate_max_files(IggyByteSize::from(0),
IggyByteSize::from(100)),
+ ONE_HUNDRED_THOUSAND as usize // Allow an unlimited number of
archived logs
+ );
+ assert_eq!(
+ Logging::calculate_max_files(
+ IggyByteSize::from(ONE_HUNDRED_THOUSAND * 10),
+ IggyByteSize::from(1)
+ ),
+ ONE_HUNDRED_THOUSAND as usize // Result should be limited to
ONE_HUNDRED_THOUSAND by clamp
+ );
+ assert_eq!(
+ Logging::calculate_max_files(IggyByteSize::from(1000),
IggyByteSize::from(100)),
+ 10
+ );
+ assert_eq!(
+ Logging::calculate_max_files(IggyByteSize::from(500),
IggyByteSize::from(100)),
+ 5
+ );
+ assert_eq!(
+ Logging::calculate_max_files(IggyByteSize::from(2000),
IggyByteSize::from(100)),
+ 20
+ );
+ assert_eq!(
+ Logging::calculate_max_files(IggyByteSize::from(50),
IggyByteSize::from(100)),
+ 1
+ );
+ }
+
+ #[test]
+ fn test_cleanup_log_files_functions() {
+ use std::time::Duration;
+ let temp_dir = TempDir::new().expect("Failed to create temporary
directory");
+ let log_path = temp_dir.path().to_path_buf();
+ Logging::cleanup_log_files(
+ &log_path,
+ IggyDuration::new(Duration::from_secs(3600)),
+ IggyByteSize::from(2048 * 1024),
+ IggyByteSize::from(512 * 1024),
+ );
+ }
+
+ #[test]
+ fn test_logging_creation() {
+ let logging = Logging::new();
+ assert!(logging.stdout_guard.is_none());
+ assert!(logging.file_guard.is_none());
+ assert!(logging.env_filter_reload_handle.is_none());
+ }
+}
diff --git a/foreign/cpp/tests/e2e/server.toml
b/foreign/cpp/tests/e2e/server.toml
index de67367aa..fb5f87b35 100644
--- a/foreign/cpp/tests/e2e/server.toml
+++ b/foreign/cpp/tests/e2e/server.toml
@@ -270,10 +270,28 @@ path = "logs"
# Level of logging detail. Options: "debug", "info", "warn", "error".
level = "info"
-# Maximum size of the log files before rotation.
-max_size = "512 MB"
-
-# Time to retain log files before deletion.
+# Whether to write logs to file. When false, logs are only written to stdout.
+# When enabled, logs are stored in {system.path}/{system.logging.path}
(default: local_data/logs).
+file_enabled = true
+
+# Maximum size of a single log file before rotation occurs. When a log
+# file reaches this size, it will be rotated (closed and a new file
+# created). This setting works together with max_total_size to control
+# log storage. You can set it to 0 to enable unlimited size of single
+# log, but all logs will be written to a single file, thus disabling
+# log rotation. Please configure 0 with caution, esp. RUST_LOG > debug
+max_file_size = "500 MB"
+
+# Maximum total size of all log files. When this size is reached,
+# the oldest log files will be deleted first. Set it to 0 to allow
+# an unlimited number of archived logs. This does not disable time
+# based log rotation or per-log-file size limits.
+max_total_size = "4 GB"
+
+# Time interval for checking log rotation status. Avoid less than 1s.
+rotation_check_interval = "1 h"
+
+# Time to retain log files before deletion. Avoid less than 1s, too.
retention = "7 days"
# Interval for printing system information to the log.