This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 47ef46869 refactor(server): initialize logging before config parsing 
(#2442)
47ef46869 is described below

commit 47ef46869ccfd3371559c9d9d65de1c5bd09b022
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Dec 4 15:02:05 2025 +0100

    refactor(server): initialize logging before config parsing (#2442)
    
    - Move early_init() before load_config() to capture bootstrap logs
    - Move telemetry initialization from early_init() to late_init()
    - Add reload handles for telemetry layers with no-op placeholders
    - Replace println!/eprintln! with tracing macros in config modules
    - Fix RUST_LOG filtering: apply EnvFilter before output layers
    - Support full EnvFilter syntax in config level field (e.g.,
    "warn,server=debug")
    - Document level field syntax in server.toml
    
    RUST_LOG environment variable always takes precedence over config.
    Finally, we can again using tracing macros during config parsing.
---
 core/common/src/configs/mod.rs        |  21 ++-
 core/configs/server.toml              |   5 +-
 core/server/src/configs/validators.rs |  38 ++--
 core/server/src/diagnostics.rs        |  17 +-
 core/server/src/log/logger.rs         | 327 +++++++++++++++++-----------------
 core/server/src/main.rs               |  28 +--
 6 files changed, 225 insertions(+), 211 deletions(-)

diff --git a/core/common/src/configs/mod.rs b/core/common/src/configs/mod.rs
index 88fd498f2..bf578b38c 100644
--- a/core/common/src/configs/mod.rs
+++ b/core/common/src/configs/mod.rs
@@ -34,6 +34,7 @@ use figment::{
 use serde::{Serialize, de::DeserializeOwned};
 use std::{env, fmt::Display, future::Future, marker::PhantomData, path::Path};
 use toml::{Value as TomlValue, map::Map as TomlMap};
+use tracing::{error, info, warn};
 
 const SECRET_MASK: &str = "******";
 const ARRAY_SEPARATOR: char = '_';
@@ -162,7 +163,7 @@ impl<T: ConfigurationType> CustomEnvProvider<T> {
                 value = SECRET_MASK.to_string();
             }
 
-            println!("{env_key} value changed to: {value} from environment 
variable");
+            info!("{env_key} value changed to: {value} from environment 
variable");
             Self::insert_environment_override(&source_dict, &mut target_dict, 
keys, env_var_value);
         }
 
@@ -752,7 +753,7 @@ fn file_exists<P: AsRef<Path>>(path: P) -> bool {
 
 impl<T: ConfigurationType, P: Provider + Clone> ConfigProvider<T> for 
FileConfigProvider<P> {
     async fn load_config(&self) -> Result<T, ConfigurationError> {
-        println!("Loading config from path: '{}'...", self.file_path);
+        info!("Loading config from path: '{}'...", self.file_path);
 
         // Start with the default configuration if provided
         let mut config_builder = Figment::new();
@@ -760,20 +761,22 @@ impl<T: ConfigurationType, P: Provider + Clone> 
ConfigProvider<T> for FileConfig
         if let Some(default) = &self.default_config {
             config_builder = config_builder.merge(default);
         } else {
-            println!("No default configuration provided.");
+            warn!("No default configuration provided.");
         }
 
         // If the config file exists, merge it into the configuration
         if file_exists(&self.file_path) {
-            println!("Found configuration file at path: '{}'.", 
self.file_path);
+            info!("Found configuration file at path: '{}'.", self.file_path);
             config_builder = config_builder.merge(Toml::file(&self.file_path));
         } else {
-            println!(
+            warn!(
                 "Configuration file not found at path: '{}'.",
                 self.file_path
             );
             if has_default {
-                println!("Using default configuration as no config file was 
found.");
+                info!(
+                    "Using default configuration embedded into server, as no 
config file was found."
+                );
             }
         }
 
@@ -785,17 +788,17 @@ impl<T: ConfigurationType, P: Provider + Clone> 
ConfigProvider<T> for FileConfig
 
         match config_result {
             Ok(config) => {
-                println!("Config loaded successfully.");
+                info!("Config loaded successfully.");
                 let display_config = env::var(DISPLAY_CONFIG_ENV)
                     .map(|val| val == "1" || val.to_lowercase() == "true")
                     .unwrap_or(self.display_config);
                 if display_config {
-                    println!("Using Config: {config}");
+                    info!("Using Config: {config}");
                 }
                 Ok(config)
             }
             Err(e) => {
-                println!("Failed to load config: {e}");
+                error!("Failed to load config: {e}");
                 Err(ConfigurationError::CannotLoadConfiguration)
             }
         }
diff --git a/core/configs/server.toml b/core/configs/server.toml
index 84c78aa22..8fde53d28 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -340,7 +340,10 @@ path = "runtime"
 # Path for storing log files.
 path = "logs"
 
-# Level of logging detail. Options: "debug", "info", "warn", "error".
+# Log filtering directive using the same syntax as the RUST_LOG environment 
variable.
+# Supports simple levels ("trace", "debug", "info", "warn", "error", "off" or 
"none")
+# as well as complex directives like "warn,server=debug,iggy=trace".
+# Note: RUST_LOG environment variable always takes precedence over this 
setting.
 level = "info"
 
 # Maximum size of the log files before rotation.
diff --git a/core/server/src/configs/validators.rs 
b/core/server/src/configs/validators.rs
index 40f829aca..ab0547dea 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -33,7 +33,7 @@ use iggy_common::MaxTopicSize;
 use iggy_common::Validatable;
 use iggy_common::{CompressionAlgorithm, ConfigurationError};
 use std::thread::available_parallelism;
-use tracing::error;
+use tracing::{error, warn};
 
 impl Validatable<ConfigurationError> for ServerConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
@@ -93,7 +93,7 @@ impl Validatable<ConfigurationError> for CompressionConfig {
         let compression_alg = &self.default_algorithm;
         if *compression_alg != CompressionAlgorithm::None {
             // TODO(numinex): Change this message once server side compression 
is fully developed.
-            println!(
+            warn!(
                 "Server started with server-side compression enabled, using 
algorithm: {compression_alg}, this feature is not implemented yet!"
             );
         }
@@ -127,7 +127,7 @@ impl Validatable<ConfigurationError> for TelemetryConfig {
 impl Validatable<ConfigurationError> for PartitionConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         if self.messages_required_to_save < 32 {
-            eprintln!(
+            error!(
                 "Configured system.partition.messages_required_to_save {} is 
less than minimum {}",
                 self.messages_required_to_save, 32
             );
@@ -135,7 +135,7 @@ impl Validatable<ConfigurationError> for PartitionConfig {
         }
 
         if !self.messages_required_to_save.is_multiple_of(32) {
-            eprintln!(
+            error!(
                 "Configured system.partition.messages_required_to_save {} is 
not a multiple of 32",
                 self.messages_required_to_save
             );
@@ -143,7 +143,7 @@ impl Validatable<ConfigurationError> for PartitionConfig {
         }
 
         if self.size_of_messages_required_to_save < 512 {
-            eprintln!(
+            error!(
                 "Configured system.partition.size_of_messages_required_to_save 
{} is less than minimum {}",
                 self.size_of_messages_required_to_save, 512
             );
@@ -155,7 +155,7 @@ impl Validatable<ConfigurationError> for PartitionConfig {
             .as_bytes_u64()
             .is_multiple_of(512)
         {
-            eprintln!(
+            error!(
                 "Configured system.partition.size_of_messages_required_to_save 
{} is not a multiple of 512 B",
                 self.size_of_messages_required_to_save
             );
@@ -169,7 +169,7 @@ impl Validatable<ConfigurationError> for PartitionConfig {
 impl Validatable<ConfigurationError> for SegmentConfig {
     fn validate(&self) -> Result<(), ConfigurationError> {
         if self.size > SEGMENT_MAX_SIZE_BYTES {
-            eprintln!(
+            error!(
                 "Configured system.segment.size {} B is greater than maximum 
{} B",
                 self.size.as_bytes_u64(),
                 SEGMENT_MAX_SIZE_BYTES
@@ -178,7 +178,7 @@ impl Validatable<ConfigurationError> for SegmentConfig {
         }
 
         if !self.size.as_bytes_u64().is_multiple_of(512) {
-            eprintln!(
+            error!(
                 "Configured system.segment.size {} B is not a multiple of 512 
B",
                 self.size.as_bytes_u64()
             );
@@ -295,11 +295,11 @@ impl Validatable<ConfigurationError> for ShardingConfig {
             CpuAllocation::All => Ok(()),
             CpuAllocation::Count(count) => {
                 if *count == 0 {
-                    eprintln!("Invalid sharding configuration: cpu_allocation 
count cannot be 0");
+                    error!("Invalid sharding configuration: cpu_allocation 
count cannot be 0");
                     return Err(ConfigurationError::InvalidConfigurationValue);
                 }
                 if *count > available_cpus {
-                    eprintln!(
+                    error!(
                         "Invalid sharding configuration: cpu_allocation count 
{count} exceeds available CPU cores {available_cpus}"
                     );
                     return Err(ConfigurationError::InvalidConfigurationValue);
@@ -308,13 +308,13 @@ impl Validatable<ConfigurationError> for ShardingConfig {
             }
             CpuAllocation::Range(start, end) => {
                 if start >= end {
-                    eprintln!(
+                    error!(
                         "Invalid sharding configuration: cpu_allocation range 
{start}..{end} is invalid (start must be less than end)"
                     );
                     return Err(ConfigurationError::InvalidConfigurationValue);
                 }
                 if *end > available_cpus {
-                    eprintln!(
+                    error!(
                         "Invalid sharding configuration: cpu_allocation range 
{start}..{end} exceeds available CPU cores (max: {available_cpus})"
                     );
                     return Err(ConfigurationError::InvalidConfigurationValue);
@@ -333,13 +333,13 @@ impl Validatable<ConfigurationError> for ClusterConfig {
 
         // Validate cluster name is not empty
         if self.name.trim().is_empty() {
-            eprintln!("Invalid cluster configuration: cluster name cannot be 
empty");
+            error!("Invalid cluster configuration: cluster name cannot be 
empty");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
         // Validate current node name is not empty
         if self.node.current.name.trim().is_empty() {
-            eprintln!("Invalid cluster configuration: current node name cannot 
be empty");
+            error!("Invalid cluster configuration: current node name cannot be 
empty");
             return Err(ConfigurationError::InvalidConfigurationValue);
         }
 
@@ -349,7 +349,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
 
         for node in &self.node.others {
             if !node_names.insert(node.name.clone()) {
-                eprintln!(
+                error!(
                     "Invalid cluster configuration: duplicate node name '{}' 
found",
                     node.name
                 );
@@ -362,13 +362,13 @@ impl Validatable<ConfigurationError> for ClusterConfig {
         for node in &self.node.others {
             // Validate node name is not empty
             if node.name.trim().is_empty() {
-                eprintln!("Invalid cluster configuration: node name cannot be 
empty");
+                error!("Invalid cluster configuration: node name cannot be 
empty");
                 return Err(ConfigurationError::InvalidConfigurationValue);
             }
 
             // Validate IP is not empty
             if node.ip.trim().is_empty() {
-                eprintln!(
+                error!(
                     "Invalid cluster configuration: IP cannot be empty for 
node '{}'",
                     node.name
                 );
@@ -386,7 +386,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
             for (name, port_opt) in &port_list {
                 if let Some(port) = port_opt {
                     if *port == 0 {
-                        eprintln!(
+                        error!(
                             "Invalid cluster configuration: {} port cannot be 
0 for node '{}'",
                             name, node.name
                         );
@@ -396,7 +396,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
                     // Check for port conflicts across nodes on the same IP
                     let endpoint = format!("{}:{}:{}", node.ip, name, port);
                     if !used_endpoints.insert(endpoint.clone()) {
-                        eprintln!(
+                        error!(
                             "Invalid cluster configuration: port conflict - 
{}:{} is already used",
                             node.ip, port
                         );
diff --git a/core/server/src/diagnostics.rs b/core/server/src/diagnostics.rs
index ea51fe85f..6029cbf88 100644
--- a/core/server/src/diagnostics.rs
+++ b/core/server/src/diagnostics.rs
@@ -55,19 +55,22 @@ pub fn print_locked_memory_limit_info() {
     eprintln!("  1. Temporarily (current session only):");
     eprintln!("     ulimit -l unlimited");
     eprintln!();
-    eprintln!("  2. Persistently (add to /etc/security/limits.conf):");
-    eprintln!("     * soft memlock unlimited");
-    eprintln!("     * hard memlock unlimited");
-    eprintln!();
-    eprintln!("  3. For systemd services (add to service file):");
-    eprintln!("     LimitMEMLOCK=infinity");
+    eprintln!("  2. Docker run:");
+    eprintln!("     docker run --ulimit memlock=-1:-1 ...");
     eprintln!();
-    eprintln!("  4. Docker Compose (add to service):");
+    eprintln!("  3. Docker Compose (add to service):");
     eprintln!("     ulimits:");
     eprintln!("       memlock:");
     eprintln!("         soft: -1");
     eprintln!("         hard: -1");
     eprintln!();
+    eprintln!("  4. Persistently (add to /etc/security/limits.conf):");
+    eprintln!("     * soft memlock unlimited");
+    eprintln!("     * hard memlock unlimited");
+    eprintln!();
+    eprintln!("  5. For systemd services (add to service file):");
+    eprintln!("     LimitMEMLOCK=infinity");
+    eprintln!();
 }
 
 /// Prints information about io_uring permission issues in containerized 
environments.
diff --git a/core/server/src/log/logger.rs b/core/server/src/log/logger.rs
index e2145f25b..5646d1a51 100644
--- a/core/server/src/log/logger.rs
+++ b/core/server/src/log/logger.rs
@@ -32,9 +32,8 @@ use opentelemetry_sdk::propagation::TraceContextPropagator;
 use opentelemetry_sdk::trace::span_processor_with_async_runtime;
 use std::io::{self, Write};
 use std::path::PathBuf;
-use std::str::FromStr;
 use std::sync::{Arc, Mutex};
-use tracing::{Level, event, info, trace};
+use tracing::{info, trace};
 use tracing_appender::non_blocking::WorkerGuard;
 use tracing_opentelemetry::OpenTelemetryLayer;
 use tracing_subscriber::field::{RecordFields, VisitOutput};
@@ -44,7 +43,7 @@ use tracing_subscriber::layer::SubscriberExt;
 use tracing_subscriber::util::SubscriberInitExt;
 use tracing_subscriber::{
     EnvFilter, Layer, Registry, filter::LevelFilter, fmt, fmt::MakeWriter, 
fmt::format::Format,
-    reload, reload::Handle,
+    layer::Layered, reload, reload::Handle,
 };
 
 const IGGY_LOG_FILE_PREFIX: &str = "iggy-server.log";
@@ -109,61 +108,83 @@ impl EarlyLogDumper for Logging {
     }
 }
 
-// Make reload::Layer::new more readable
-type ReloadHandle = Handle<Box<dyn Layer<Registry> + Send + Sync>, Registry>;
+// Type aliases for reload handles with different subscriber types
+type ReloadHandle<S> = Handle<Box<dyn Layer<S> + Send + Sync>, S>;
+type EnvFilterReloadHandle = Handle<EnvFilter, Registry>;
+
+// Subscriber type after applying EnvFilter to Registry
+type FilteredRegistry = Layered<reload::Layer<EnvFilter, Registry>, Registry>;
 
 pub struct Logging {
     stdout_guard: Option<WorkerGuard>,
-    stdout_reload_handle: Option<ReloadHandle>,
+    stdout_reload_handle: Option<ReloadHandle<FilteredRegistry>>,
 
     file_guard: Option<WorkerGuard>,
-    file_reload_handle: Option<ReloadHandle>,
+    file_reload_handle: Option<ReloadHandle<FilteredRegistry>>,
 
-    filtering_stdout_reload_handle: Option<ReloadHandle>,
-    filtering_file_reload_handle: Option<ReloadHandle>,
+    env_filter_reload_handle: Option<EnvFilterReloadHandle>,
 
-    early_logs_buffer: Arc<Mutex<Vec<String>>>,
+    otel_logs_reload_handle: Option<ReloadHandle<FilteredRegistry>>,
+    otel_traces_reload_handle: Option<ReloadHandle<FilteredRegistry>>,
 
-    telemetry_config: TelemetryConfig,
+    early_logs_buffer: Arc<Mutex<Vec<String>>>,
 }
 
 impl Logging {
-    pub fn new(telemetry_config: TelemetryConfig) -> Self {
+    pub fn new() -> Self {
         Self {
             stdout_guard: None,
             stdout_reload_handle: None,
             file_guard: None,
             file_reload_handle: None,
-            filtering_stdout_reload_handle: None,
-            filtering_file_reload_handle: None,
+            env_filter_reload_handle: None,
+            otel_logs_reload_handle: None,
+            otel_traces_reload_handle: None,
             early_logs_buffer: Arc::new(Mutex::new(vec![])),
-            telemetry_config,
         }
     }
 
     pub fn early_init(&mut self) {
-        // Initialize layers
-        // First layer is filtering based on severity
-        // Second layer will just consume drain log entries and has first 
layer as a dependency
-        // Third layer will write to a safe buffer and has first layer as a 
dependency
-        // All layers will be replaced during late_init
-        let mut layers = vec![];
-
-        let filtering_level = Self::get_filtering_level(None);
-        let (filtering_stdout_layer, filtering_stdout_reload_handle) =
-            reload::Layer::new(filtering_level.boxed());
-        self.filtering_stdout_reload_handle = 
Some(filtering_stdout_reload_handle);
-
-        let (filtering_file_layer, filtering_file_reload_handle) =
-            reload::Layer::new(filtering_level.boxed());
-        self.filtering_file_reload_handle = Some(filtering_file_reload_handle);
-
+        // Initialize layers with placeholders that will be replaced during 
late_init.
+        // This allows logging to work before config is parsed.
+        //
+        // Layer structure:
+        // - EnvFilter: Applied FIRST via .with() to filter all subsequent 
layers
+        // - Stdout layer: writes to NullWriter (discarded), replaced with 
real stdout in late_init
+        // - File layer: writes to in-memory buffer, replaced with rolling 
file in late_init
+        // - Telemetry layers: no-op placeholders (LevelFilter::OFF), replaced 
if telemetry enabled
+        //
+        // EnvFilter MUST be applied first. All subsequent layers are typed 
for FilteredRegistry.
+        // This ensures the filter is evaluated before any output layer 
processes events.
+
+        // EnvFilter applied FIRST - wraps all subsequent layers.
+        // RUST_LOG takes precedence; otherwise defaults to INFO until 
late_init reloads with config.
+        let env_filter =
+            EnvFilter::try_from_default_env().unwrap_or_else(|_| 
EnvFilter::new("INFO"));
+        let (env_filter_layer, env_filter_reload_handle) = 
reload::Layer::new(env_filter);
+        self.env_filter_reload_handle = Some(env_filter_reload_handle);
+
+        // All output layers are typed for FilteredRegistry (Registry + 
EnvFilter)
+        let mut layers: Vec<Box<dyn Layer<FilteredRegistry> + Send + Sync>> = 
vec![];
+
+        // Telemetry layers - no-op placeholders (LevelFilter::OFF) replaced 
in late_init if enabled
+        let (otel_logs_layer, otel_logs_reload_handle) =
+            reload::Layer::new(LevelFilter::OFF.boxed());
+        self.otel_logs_reload_handle = Some(otel_logs_reload_handle);
+        layers.push(Box::new(otel_logs_layer));
+
+        let (otel_traces_layer, otel_traces_reload_handle) =
+            reload::Layer::new(LevelFilter::OFF.boxed());
+        self.otel_traces_reload_handle = Some(otel_traces_reload_handle);
+        layers.push(Box::new(otel_traces_layer));
+
+        // Output layers
         let stdout_layer = fmt::Layer::default()
             .event_format(Self::get_log_format())
             .with_writer(|| NullWriter);
         let (stdout_layer, stdout_layer_reload_handle) = 
reload::Layer::new(stdout_layer.boxed());
         self.stdout_reload_handle = Some(stdout_layer_reload_handle);
-        layers.push(stdout_layer.and_then(filtering_stdout_layer));
+        layers.push(Box::new(stdout_layer));
 
         let file_layer = fmt::Layer::default()
             .event_format(Self::get_log_format())
@@ -172,34 +193,115 @@ impl Logging {
             .with_ansi(false);
         let (file_layer, file_layer_reload_handle) = 
reload::Layer::new(file_layer.boxed());
         self.file_reload_handle = Some(file_layer_reload_handle);
-        layers.push(file_layer.and_then(filtering_file_layer));
-
-        if !self.telemetry_config.enabled {
-            // This is moment when we can start logging something and not 
worry about losing it.
-            Registry::default()
-                .with(layers)
-                
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
-                .init();
-            Self::print_build_info();
-            return;
+        layers.push(Box::new(file_layer));
+
+        Registry::default()
+            .with(env_filter_layer)
+            .with(layers)
+            .init();
+        Self::print_build_info();
+    }
+
+    pub fn late_init(
+        &mut self,
+        base_directory: String,
+        config: &LoggingConfig,
+        telemetry_config: &TelemetryConfig,
+    ) -> Result<(), LogError> {
+        // Write to stdout and file at the same time.
+        // Use the non_blocking appender to avoid blocking the threads.
+        // Use the rolling appender to avoid having a huge log file.
+        // Make sure logs are dumped to the file during graceful shutdown.
+
+        trace!("Logging config: {}", config);
+
+        // Reload EnvFilter with config level if RUST_LOG is not set.
+        // Config level supports EnvFilter syntax (e.g., 
"warn,server=debug,iggy=trace").
+        let log_filter = if std::env::var("RUST_LOG").is_ok() {
+            // RUST_LOG takes precedence - don't reload, keep what was set in 
early_init
+            std::env::var("RUST_LOG").unwrap()
+        } else {
+            // Use config level as EnvFilter directive
+            let env_filter = EnvFilter::new(&config.level);
+            self.env_filter_reload_handle
+                .as_ref()
+                .ok_or(LogError::FilterReloadFailure)?
+                .modify(|filter| *filter = env_filter)
+                .expect("Failed to modify EnvFilter");
+            config.level.clone()
+        };
+
+        // Initialize non-blocking stdout layer
+        let (non_blocking_stdout, stdout_guard) = 
tracing_appender::non_blocking(std::io::stdout());
+        let stdout_layer = fmt::Layer::default()
+            .with_ansi(true)
+            .event_format(Self::get_log_format())
+            .with_writer(non_blocking_stdout)
+            .boxed();
+        self.stdout_guard = Some(stdout_guard);
+
+        self.stdout_reload_handle
+            .as_ref()
+            .ok_or(LogError::StdoutReloadFailure)?
+            .modify(|layer| *layer = stdout_layer)
+            .expect("Failed to modify stdout layer");
+
+        self.dump_to_stdout();
+
+        // Initialize directory and file for logs
+        let base_directory = PathBuf::from(base_directory);
+        let logs_subdirectory = PathBuf::from(config.path.clone());
+        let logs_path = base_directory.join(logs_subdirectory.clone());
+        let file_appender =
+            tracing_appender::rolling::hourly(logs_path.clone(), 
IGGY_LOG_FILE_PREFIX);
+        let (mut non_blocking_file, file_guard) = 
tracing_appender::non_blocking(file_appender);
+
+        self.dump_to_file(&mut non_blocking_file);
+
+        let file_layer = fmt::layer()
+            .event_format(Self::get_log_format())
+            .with_target(true)
+            .with_writer(non_blocking_file)
+            .with_ansi(false)
+            .fmt_fields(NoAnsiFields {})
+            .boxed();
+
+        self.file_guard = Some(file_guard);
+        self.file_reload_handle
+            .as_ref()
+            .ok_or(LogError::FileReloadFailure)?
+            .modify(|layer| *layer = file_layer)
+            .expect("Failed to modify file layer");
+
+        // Initialize telemetry if enabled
+        if telemetry_config.enabled {
+            self.init_telemetry(telemetry_config)?;
         }
 
-        let service_name = self.telemetry_config.service_name.to_owned();
+        info!(
+            "Logging initialized, logs will be stored at: {logs_path:?}. Logs 
will be rotated hourly. Log filter: {log_filter}."
+        );
+
+        Ok(())
+    }
+
+    fn init_telemetry(&mut self, telemetry_config: &TelemetryConfig) -> 
Result<(), LogError> {
+        let service_name = telemetry_config.service_name.to_owned();
         let resource = Resource::builder()
-            .with_service_name(service_name.to_owned())
+            .with_service_name(service_name.clone())
             .with_attribute(KeyValue::new(
                 opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
                 VERSION,
             ))
             .build();
 
-        let logger_provider = match self.telemetry_config.logs.transport {
+        let logger_provider = match telemetry_config.logs.transport {
             TelemetryTransport::GRPC => 
opentelemetry_sdk::logs::SdkLoggerProvider::builder()
                 .with_resource(resource.clone())
                 .with_batch_exporter(
                     opentelemetry_otlp::LogExporter::builder()
                         .with_tonic()
-                        
.with_endpoint(self.telemetry_config.logs.endpoint.clone())
+                        .with_endpoint(telemetry_config.logs.endpoint.clone())
                         .build()
                         .expect("Failed to initialize gRPC logger."),
                 )
@@ -208,7 +310,7 @@ impl Logging {
                 let log_exporter = opentelemetry_otlp::LogExporter::builder()
                     .with_http()
                     .with_http_client(reqwest::Client::new())
-                    .with_endpoint(self.telemetry_config.logs.endpoint.clone())
+                    .with_endpoint(telemetry_config.logs.endpoint.clone())
                     .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
                     .build()
                     .expect("Failed to initialize HTTP logger.");
@@ -225,13 +327,13 @@ impl Logging {
             }
         };
 
-        let tracer_provider = match self.telemetry_config.traces.transport {
+        let tracer_provider = match telemetry_config.traces.transport {
             TelemetryTransport::GRPC => 
opentelemetry_sdk::trace::SdkTracerProvider::builder()
                 .with_resource(resource.clone())
                 .with_batch_exporter(
                     opentelemetry_otlp::SpanExporter::builder()
                         .with_tonic()
-                        
.with_endpoint(self.telemetry_config.traces.endpoint.clone())
+                        
.with_endpoint(telemetry_config.traces.endpoint.clone())
                         .build()
                         .expect("Failed to initialize gRPC tracer."),
                 )
@@ -240,7 +342,7 @@ impl Logging {
                 let trace_exporter = 
opentelemetry_otlp::SpanExporter::builder()
                     .with_http()
                     .with_http_client(reqwest::Client::new())
-                    
.with_endpoint(self.telemetry_config.traces.endpoint.clone())
+                    .with_endpoint(telemetry_config.traces.endpoint.clone())
                     .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
                     .build()
                     .expect("Failed to initialize HTTP tracer.");
@@ -261,129 +363,26 @@ impl Logging {
         global::set_tracer_provider(tracer_provider.clone());
         global::set_text_map_propagator(TraceContextPropagator::new());
 
-        Registry::default()
-            .with(layers)
-            .with(OpenTelemetryTracingBridge::new(&logger_provider))
-            .with(OpenTelemetryLayer::new(tracer))
-            
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
-            .init();
-        Self::print_build_info();
-    }
-
-    pub fn late_init(
-        &mut self,
-        base_directory: String,
-        config: &LoggingConfig,
-    ) -> Result<(), LogError> {
-        // Write to stdout and file at the same time.
-        // Use the non_blocking appender to avoid blocking the threads.
-        // Use the rolling appender to avoid having a huge log file.
-        // Make sure logs are dumped to the file during graceful shutdown.
-
-        trace!("Logging config: {}", config);
-
-        let filtering_level = Self::get_filtering_level(Some(config));
-
-        self.filtering_stdout_reload_handle
+        // Reload telemetry layers with actual implementations
+        self.otel_logs_reload_handle
             .as_ref()
             .ok_or(LogError::FilterReloadFailure)?
-            .modify(|layer| *layer = filtering_level.boxed())
-            .expect("Failed to modify stdout filtering layer");
+            .modify(|layer| *layer = 
OpenTelemetryTracingBridge::new(&logger_provider).boxed())
+            .expect("Failed to modify telemetry logs layer");
 
-        self.filtering_file_reload_handle
+        self.otel_traces_reload_handle
             .as_ref()
             .ok_or(LogError::FilterReloadFailure)?
-            .modify(|layer| *layer = filtering_level.boxed())
-            .expect("Failed to modify file filtering layer");
-
-        // Initialize non-blocking stdout layer
-        let (non_blocking_stdout, stdout_guard) = 
tracing_appender::non_blocking(std::io::stdout());
-        let stdout_layer = fmt::Layer::default()
-            .with_ansi(true)
-            .event_format(Self::get_log_format())
-            .with_writer(non_blocking_stdout)
-            .boxed();
-        self.stdout_guard = Some(stdout_guard);
+            .modify(|layer| *layer = OpenTelemetryLayer::new(tracer).boxed())
+            .expect("Failed to modify telemetry traces layer");
 
-        self.stdout_reload_handle
-            .as_ref()
-            .ok_or(LogError::StdoutReloadFailure)?
-            .modify(|layer| *layer = stdout_layer)
-            .expect("Failed to modify stdout layer");
-
-        self.dump_to_stdout();
-
-        // Initialize directory and file for logs
-        let base_directory = PathBuf::from(base_directory);
-        let logs_subdirectory = PathBuf::from(config.path.clone());
-        let logs_path = base_directory.join(logs_subdirectory.clone());
-        let file_appender =
-            tracing_appender::rolling::hourly(logs_path.clone(), 
IGGY_LOG_FILE_PREFIX);
-        let (mut non_blocking_file, file_guard) = 
tracing_appender::non_blocking(file_appender);
-
-        self.dump_to_file(&mut non_blocking_file);
-
-        let file_layer = fmt::layer()
-            .event_format(Self::get_log_format())
-            .with_target(true)
-            .with_writer(non_blocking_file)
-            .with_ansi(false)
-            .fmt_fields(NoAnsiFields {})
-            .boxed();
-
-        self.file_guard = Some(file_guard);
-        self.file_reload_handle
-            .as_ref()
-            .ok_or(LogError::FileReloadFailure)?
-            .modify(|layer| *layer = file_layer)
-            .expect("Failed to modify file layer");
-        let level = filtering_level.to_string();
-
-        let print = format!(
-            "Logging initialized, logs will be stored at: {logs_path:?}. Logs 
will be rotated hourly. Log level is: {level}."
+        info!(
+            "Telemetry initialized with service name: {}",
+            telemetry_config.service_name
         );
-
-        match filtering_level {
-            LevelFilter::OFF => (),
-            LevelFilter::ERROR => event!(Level::ERROR, "{}", print),
-            LevelFilter::WARN => event!(Level::WARN, "{}", print),
-            LevelFilter::INFO => event!(Level::INFO, "{}", print),
-            LevelFilter::DEBUG => event!(Level::DEBUG, "{}", print),
-            LevelFilter::TRACE => event!(Level::TRACE, "{}", print),
-        }
-
         Ok(())
     }
 
-    // RUST_LOG always takes precedence over config
-    fn get_filtering_level(config: Option<&LoggingConfig>) -> LevelFilter {
-        if let Ok(rust_log) = std::env::var("RUST_LOG") {
-            // Parse log level from RUST_LOG env variable
-            if let Ok(level) = LevelFilter::from_str(&rust_log.to_uppercase()) 
{
-                level
-            } else {
-                println!("Invalid RUST_LOG value: {rust_log}, falling back to 
info");
-                LevelFilter::INFO
-            }
-        } else {
-            // Parse log level from config
-            if let Some(config) = config {
-                if let Ok(level) = 
LevelFilter::from_str(&config.level.to_uppercase()) {
-                    level
-                } else {
-                    println!(
-                        "Invalid log level in config: {}, falling back to 
info",
-                        config.level
-                    );
-                    LevelFilter::INFO
-                }
-            } else {
-                // config not provided
-                LevelFilter::INFO
-            }
-        }
-    }
-
     fn get_log_format() -> Format {
         Format::default().with_thread_names(true)
     }
@@ -412,7 +411,7 @@ impl Logging {
 
 impl Default for Logging {
     fn default() -> Self {
-        Self::new(TelemetryConfig::default())
+        Self::new()
     }
 }
 
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index d7b32f015..4d9d0f08e 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -138,6 +138,11 @@ fn main() -> Result<(), ServerError> {
         let is_follower = args.follower;
 
         // FIRST DISCRETE LOADING STEP.
+        // Initialize early logging before config parsing so we can log during 
bootstrap.
+        let mut logging = Logging::new();
+        logging.early_init();
+
+        // SECOND DISCRETE LOADING STEP.
         // Load config and create directories.
         // Remove `local_data` directory if run with `--fresh` flag.
         let config = load_config().await.with_error(|error| {
@@ -146,27 +151,28 @@ fn main() -> Result<(), ServerError> {
         if args.fresh {
             let system_path = config.system.get_system_path();
             if compio::fs::metadata(&system_path).await.is_ok() {
-                println!(
+                warn!(
                     "Removing system path at: {} because `--fresh` flag was 
set",
                     system_path
                 );
                 if let Err(e) = fs_utils::remove_dir_all(&system_path).await {
-                    eprintln!("Failed to remove system path at {}: {}", 
system_path, e);
+                    warn!("Failed to remove system path at {system_path}: 
{e}");
                 }
             }
         }
 
-        // SECOND DISCRETE LOADING STEP.
+        // THIRD DISCRETE LOADING STEP.
         // Create directories.
         create_directories(&config.system).await?;
 
-        // Initialize logging
-        // THIRD DISCRETE LOADING STEP.
-        let mut logging = Logging::new(config.telemetry.clone());
-        logging.early_init();
-
-        // From this point on, we can use tracing macros to log messages.
-        logging.late_init(config.system.get_system_path(), 
&config.system.logging)?;
+        // FOURTH DISCRETE LOADING STEP.
+        // Complete logging setup with config (file output, telemetry).
+        // From this point on, logs are persisted to file and telemetry is 
active.
+        logging.late_init(
+            config.system.get_system_path(),
+            &config.system.logging,
+            &config.telemetry,
+        )?;
 
         if is_follower {
             info!("Server is running in FOLLOWER mode for testing leader 
redirection");
@@ -198,7 +204,7 @@ fn main() -> Result<(), ServerError> {
             }
         }
 
-        // FOURTH DISCRETE LOADING STEP.
+        // FIFTH DISCRETE LOADING STEP.
         MemoryPool::init_pool(&config.system.memory_pool.into_other());
 
         // SIXTH DISCRETE LOADING STEP.


Reply via email to