hubcio commented on code in PR #2452:
URL: https://github.com/apache/iggy/pull/2452#discussion_r2657084977
##########
core/server/src/log/logger.rs:
##########
@@ -397,26 +448,286 @@ impl Logging {
Format::default().with_thread_names(true)
}
- fn _install_log_rotation_handler(&self) {
- todo!("Implement log rotation handler based on size and retention
time");
- }
-
fn print_build_info() {
if option_env!("IGGY_CI_BUILD") == Some("true") {
let hash = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown");
let built_at =
option_env!("VERGEN_BUILD_TIMESTAMP").unwrap_or("unknown");
let rust_version =
option_env!("VERGEN_RUSTC_SEMVER").unwrap_or("unknown");
let target =
option_env!("VERGEN_CARGO_TARGET_TRIPLE").unwrap_or("unknown");
info!(
- "Version: {VERSION}, hash: {}, built at: {} using rust
version: {} for target: {}",
- hash, built_at, rust_version, target
+ "Version: {VERSION}, hash: {hash}, built at: {built_at} using
rust version: {rust_version} for target: {target}"
);
} else {
info!(
"It seems that you are a developer. Environment variable
IGGY_CI_BUILD is not set to 'true', skipping build info print."
)
}
}
+
+ fn calculate_max_files(max_total_size_bytes: u64, max_file_size_bytes:
u64) -> usize {
+ if max_file_size_bytes == 0 {
+ return 10;
+ }
+
+ let max_files = max_total_size_bytes / max_file_size_bytes;
+ max_files.clamp(1, 1000) as usize
+ }
+
+ fn install_log_rotation_handler(
+ &self,
+ config: &LoggingConfig,
+ logs_path: Option<&PathBuf>,
+ ) -> Option<std::thread::JoinHandle<()>> {
+ if let Some(logs_path) = logs_path {
+ let path = logs_path.to_path_buf();
+ let max_total_size_bytes = config.max_total_size.as_bytes_u64();
+ let max_file_size_bytes = config.max_file_size.as_bytes_u64();
+ let rotation_check_interval = config.rotation_check_interval;
+ let retention = config.retention.get_duration();
+ let should_stop = Arc::clone(&self.rotation_should_stop);
+
+ let (tx, rx) = std::sync::mpsc::channel::<()>();
+
+ {
+ let mut sender_guard =
self.rotation_stop_sender.lock().unwrap();
+ *sender_guard = Some(tx.clone());
+ }
+
+ let handle = std::thread::Builder::new()
+ .name("log-rotation".to_string())
+ .spawn(move || {
+ loop {
+ if
should_stop.load(std::sync::atomic::Ordering::Relaxed) {
+ debug!("Log rotation thread detected stop flag,
exiting");
+ break;
+ }
+
+ match
rx.recv_timeout(rotation_check_interval.get_duration()) {
+ Ok(_) => {
+ debug!("Log rotation thread received channel
stop signal, exiting");
+ break;
+ }
+ Err(std::sync::mpsc::RecvTimeoutError::Timeout) =>
{
+ Self::cleanup_log_files(
+ &path,
+ retention,
+ max_total_size_bytes,
+ max_file_size_bytes,
+ );
+ }
+
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
+ warn!("Log rotation channel disconnected,
exiting thread");
+ break;
+ }
+ }
+ }
+ debug!("Log rotation thread exited gracefully");
+ })
+ .expect("Failed to spawn log rotation thread");
+
+ Some(handle)
+ } else {
+ None
+ }
+ }
+
+ fn read_log_files(
+ logs_path: &PathBuf,
+ ) -> Vec<(fs::DirEntry, std::time::SystemTime, Duration, u64)> {
+ use std::fs;
+ use std::time::UNIX_EPOCH;
+
+ let entries = match fs::read_dir(logs_path) {
+ Ok(entries) => entries,
+ Err(e) => {
+ warn!("Failed to read log directory {logs_path:?}: {e}");
+ return Vec::new();
+ }
+ };
+
+ let mut file_entries = Vec::new();
+
+ for entry in entries.flatten() {
+ if let Some(file_name) = entry.file_name().to_str() {
+ if file_name == IGGY_LOG_FILE_PREFIX {
+ continue;
+ }
+ if !file_name.starts_with(IGGY_LOG_FILE_PREFIX) {
+ continue;
+ }
+ } else {
+ continue;
+ }
+
+ let metadata = match entry.metadata() {
+ Ok(metadata) => metadata,
+ Err(e) => {
+ warn!(
+ "Failed to get metadata for {entry_path:?}: {e}",
+ entry_path = entry.path()
+ );
+ continue;
+ }
+ };
+
+ if !metadata.is_file() {
+ continue;
+ }
+
+ let modified = match metadata.modified() {
+ Ok(modified) => modified,
+ Err(e) => {
+ warn!(
+ "Failed to get modification time for {entry_path:?}:
{e}",
+ entry_path = entry.path()
+ );
+ continue;
+ }
+ };
+
+ let elapsed = match modified.duration_since(UNIX_EPOCH) {
+ Ok(elapsed) => elapsed,
+ Err(e) => {
+ warn!(
+ "Failed to calculate elapsed time for {entry_path:?}:
{e}",
+ entry_path = entry.path()
+ );
+ continue;
+ }
+ };
+
+ let file_size = metadata.len();
+ file_entries.push((entry, modified, elapsed, file_size));
+ }
+
+ file_entries
+ }
+
+ fn cleanup_log_files(
+ logs_path: &PathBuf,
+ retention: Duration,
+ max_total_size_bytes: u64,
+ max_file_size_bytes: u64,
+ ) {
+ use std::fs;
+ use std::time::{SystemTime, UNIX_EPOCH};
+
+ debug!(
+ "Starting log cleanup for directory: {logs_path:?}, retention:
{retention:?}, max_total_size: {max_total_size_bytes} bytes,
max_single_file_size: {max_file_size_bytes} bytes"
+ );
+
+ let mut file_entries = Self::read_log_files(logs_path);
+ debug!(
+ "Processed {file_entries_len} log files from directory:
{logs_path:?}",
+ file_entries_len = file_entries.len(),
+ );
+
+ let mut removed_files_count = 0;
+ let cutoff = if !retention.is_zero() {
+ match SystemTime::now().duration_since(UNIX_EPOCH) {
+ Ok(now) => Some(now - retention),
+ Err(e) => {
+ warn!("Failed to get current time: {e}");
+ return;
+ }
+ }
+ } else {
+ None
+ };
+
+ let mut to_remove = Vec::new();
+ for (idx, (entry, _, elapsed, file_size)) in
file_entries.iter().enumerate() {
+ let mut need_remove = false;
+ if let Some(cutoff) = &cutoff
+ && *elapsed < *cutoff
+ {
+ need_remove = true;
+ debug!(
+ "Mark old log file for remove: {entry_path:?}",
+ entry_path = entry.path()
+ );
+ }
+
+ if !need_remove && max_file_size_bytes > 0 && *file_size >
max_file_size_bytes {
+ need_remove = true;
+ debug!(
+ "Mark oversized log file for remove (size: {} bytes,
limit: {} bytes): {entry_path:?}",
+ file_size,
+ max_file_size_bytes,
+ entry_path = entry.path()
+ );
+ }
+
+ if need_remove {
+ to_remove.push(idx);
+ }
+ }
+
+ for &idx in to_remove.iter().rev() {
+ let entry = &file_entries[idx];
+ let mut remove_success = false;
+ for _retry in 1..=3 {
+ if fs::remove_file(entry.0.path()).is_ok() {
+ remove_success = true;
+ break;
+ }
+ std::thread::sleep(Duration::from_millis(100));
+ }
+
+ if remove_success {
+ debug!(
+ "Removed log file: {entry_path:?}",
+ entry_path = entry.0.path()
+ );
+ removed_files_count += 1;
+ file_entries.remove(idx);
+ } else {
+ warn!(
+ "Failed to remove log file {entry_path:?} after 3 retries",
+ entry_path = entry.0.path()
+ );
+ }
+ }
+
+ let skip_size_cleanup = max_total_size_bytes == 0 &&
max_file_size_bytes == 0;
+ if !skip_size_cleanup && max_total_size_bytes > 0 {
+ let total_size: u64 = file_entries.iter().map(|(_, _, _, size)|
*size).sum();
+ if total_size > max_total_size_bytes {
+ file_entries.sort_unstable_by_key(|(_, modified, _, _)|
*modified);
+
+ let mut current_size = total_size;
+ let mut to_remove_total = Vec::new();
+ for (idx, (_entry, _, _, file_size)) in
file_entries.iter().enumerate() {
+ if current_size <= max_total_size_bytes {
+ break;
+ }
+ to_remove_total.push((idx, *file_size));
+ current_size = current_size.saturating_sub(*file_size);
+ }
+
+ for (idx, file_size) in to_remove_total.iter().rev() {
+ let entry = &file_entries[*idx];
+ if fs::remove_file(entry.0.path()).is_ok() {
+ debug!(
+ "Removed log file to control size: {entry_path:?}
freed {file_size} bytes",
+ entry_path = entry.0.path(),
+ );
+ removed_files_count += 1;
+ file_entries.remove(*idx);
+ } else {
+ warn!(
+ "Failed to remove log file {entry_path:?} for size
control",
+ entry_path = entry.0.path()
+ );
+ }
+ }
+ }
+ }
+
+ info!(
Review Comment:
dont print anything if 0 files were removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]