This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 47ef46869 refactor(server): initialize logging before config parsing
(#2442)
47ef46869 is described below
commit 47ef46869ccfd3371559c9d9d65de1c5bd09b022
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Dec 4 15:02:05 2025 +0100
refactor(server): initialize logging before config parsing (#2442)
- Move early_init() before load_config() to capture bootstrap logs
- Move telemetry initialization from early_init() to late_init()
- Add reload handles for telemetry layers with no-op placeholders
- Replace println!/eprintln! with tracing macros in config modules
- Fix RUST_LOG filtering: apply EnvFilter before output layers
- Support full EnvFilter syntax in config level field (e.g.,
"warn,server=debug")
- Document level field syntax in server.toml
RUST_LOG environment variable always takes precedence over config.
Finally, we can again using tracing macros during config parsing.
---
core/common/src/configs/mod.rs | 21 ++-
core/configs/server.toml | 5 +-
core/server/src/configs/validators.rs | 38 ++--
core/server/src/diagnostics.rs | 17 +-
core/server/src/log/logger.rs | 327 +++++++++++++++++-----------------
core/server/src/main.rs | 28 +--
6 files changed, 225 insertions(+), 211 deletions(-)
diff --git a/core/common/src/configs/mod.rs b/core/common/src/configs/mod.rs
index 88fd498f2..bf578b38c 100644
--- a/core/common/src/configs/mod.rs
+++ b/core/common/src/configs/mod.rs
@@ -34,6 +34,7 @@ use figment::{
use serde::{Serialize, de::DeserializeOwned};
use std::{env, fmt::Display, future::Future, marker::PhantomData, path::Path};
use toml::{Value as TomlValue, map::Map as TomlMap};
+use tracing::{error, info, warn};
const SECRET_MASK: &str = "******";
const ARRAY_SEPARATOR: char = '_';
@@ -162,7 +163,7 @@ impl<T: ConfigurationType> CustomEnvProvider<T> {
value = SECRET_MASK.to_string();
}
- println!("{env_key} value changed to: {value} from environment
variable");
+ info!("{env_key} value changed to: {value} from environment
variable");
Self::insert_environment_override(&source_dict, &mut target_dict,
keys, env_var_value);
}
@@ -752,7 +753,7 @@ fn file_exists<P: AsRef<Path>>(path: P) -> bool {
impl<T: ConfigurationType, P: Provider + Clone> ConfigProvider<T> for
FileConfigProvider<P> {
async fn load_config(&self) -> Result<T, ConfigurationError> {
- println!("Loading config from path: '{}'...", self.file_path);
+ info!("Loading config from path: '{}'...", self.file_path);
// Start with the default configuration if provided
let mut config_builder = Figment::new();
@@ -760,20 +761,22 @@ impl<T: ConfigurationType, P: Provider + Clone>
ConfigProvider<T> for FileConfig
if let Some(default) = &self.default_config {
config_builder = config_builder.merge(default);
} else {
- println!("No default configuration provided.");
+ warn!("No default configuration provided.");
}
// If the config file exists, merge it into the configuration
if file_exists(&self.file_path) {
- println!("Found configuration file at path: '{}'.",
self.file_path);
+ info!("Found configuration file at path: '{}'.", self.file_path);
config_builder = config_builder.merge(Toml::file(&self.file_path));
} else {
- println!(
+ warn!(
"Configuration file not found at path: '{}'.",
self.file_path
);
if has_default {
- println!("Using default configuration as no config file was
found.");
+ info!(
+ "Using default configuration embedded into server, as no
config file was found."
+ );
}
}
@@ -785,17 +788,17 @@ impl<T: ConfigurationType, P: Provider + Clone>
ConfigProvider<T> for FileConfig
match config_result {
Ok(config) => {
- println!("Config loaded successfully.");
+ info!("Config loaded successfully.");
let display_config = env::var(DISPLAY_CONFIG_ENV)
.map(|val| val == "1" || val.to_lowercase() == "true")
.unwrap_or(self.display_config);
if display_config {
- println!("Using Config: {config}");
+ info!("Using Config: {config}");
}
Ok(config)
}
Err(e) => {
- println!("Failed to load config: {e}");
+ error!("Failed to load config: {e}");
Err(ConfigurationError::CannotLoadConfiguration)
}
}
diff --git a/core/configs/server.toml b/core/configs/server.toml
index 84c78aa22..8fde53d28 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -340,7 +340,10 @@ path = "runtime"
# Path for storing log files.
path = "logs"
-# Level of logging detail. Options: "debug", "info", "warn", "error".
+# Log filtering directive using the same syntax as the RUST_LOG environment
variable.
+# Supports simple levels ("trace", "debug", "info", "warn", "error", "off" or
"none")
+# as well as complex directives like "warn,server=debug,iggy=trace".
+# Note: RUST_LOG environment variable always takes precedence over this
setting.
level = "info"
# Maximum size of the log files before rotation.
diff --git a/core/server/src/configs/validators.rs
b/core/server/src/configs/validators.rs
index 40f829aca..ab0547dea 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -33,7 +33,7 @@ use iggy_common::MaxTopicSize;
use iggy_common::Validatable;
use iggy_common::{CompressionAlgorithm, ConfigurationError};
use std::thread::available_parallelism;
-use tracing::error;
+use tracing::{error, warn};
impl Validatable<ConfigurationError> for ServerConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
@@ -93,7 +93,7 @@ impl Validatable<ConfigurationError> for CompressionConfig {
let compression_alg = &self.default_algorithm;
if *compression_alg != CompressionAlgorithm::None {
// TODO(numinex): Change this message once server side compression
is fully developed.
- println!(
+ warn!(
"Server started with server-side compression enabled, using
algorithm: {compression_alg}, this feature is not implemented yet!"
);
}
@@ -127,7 +127,7 @@ impl Validatable<ConfigurationError> for TelemetryConfig {
impl Validatable<ConfigurationError> for PartitionConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
if self.messages_required_to_save < 32 {
- eprintln!(
+ error!(
"Configured system.partition.messages_required_to_save {} is
less than minimum {}",
self.messages_required_to_save, 32
);
@@ -135,7 +135,7 @@ impl Validatable<ConfigurationError> for PartitionConfig {
}
if !self.messages_required_to_save.is_multiple_of(32) {
- eprintln!(
+ error!(
"Configured system.partition.messages_required_to_save {} is
not a multiple of 32",
self.messages_required_to_save
);
@@ -143,7 +143,7 @@ impl Validatable<ConfigurationError> for PartitionConfig {
}
if self.size_of_messages_required_to_save < 512 {
- eprintln!(
+ error!(
"Configured system.partition.size_of_messages_required_to_save
{} is less than minimum {}",
self.size_of_messages_required_to_save, 512
);
@@ -155,7 +155,7 @@ impl Validatable<ConfigurationError> for PartitionConfig {
.as_bytes_u64()
.is_multiple_of(512)
{
- eprintln!(
+ error!(
"Configured system.partition.size_of_messages_required_to_save
{} is not a multiple of 512 B",
self.size_of_messages_required_to_save
);
@@ -169,7 +169,7 @@ impl Validatable<ConfigurationError> for PartitionConfig {
impl Validatable<ConfigurationError> for SegmentConfig {
fn validate(&self) -> Result<(), ConfigurationError> {
if self.size > SEGMENT_MAX_SIZE_BYTES {
- eprintln!(
+ error!(
"Configured system.segment.size {} B is greater than maximum
{} B",
self.size.as_bytes_u64(),
SEGMENT_MAX_SIZE_BYTES
@@ -178,7 +178,7 @@ impl Validatable<ConfigurationError> for SegmentConfig {
}
if !self.size.as_bytes_u64().is_multiple_of(512) {
- eprintln!(
+ error!(
"Configured system.segment.size {} B is not a multiple of 512
B",
self.size.as_bytes_u64()
);
@@ -295,11 +295,11 @@ impl Validatable<ConfigurationError> for ShardingConfig {
CpuAllocation::All => Ok(()),
CpuAllocation::Count(count) => {
if *count == 0 {
- eprintln!("Invalid sharding configuration: cpu_allocation
count cannot be 0");
+ error!("Invalid sharding configuration: cpu_allocation
count cannot be 0");
return Err(ConfigurationError::InvalidConfigurationValue);
}
if *count > available_cpus {
- eprintln!(
+ error!(
"Invalid sharding configuration: cpu_allocation count
{count} exceeds available CPU cores {available_cpus}"
);
return Err(ConfigurationError::InvalidConfigurationValue);
@@ -308,13 +308,13 @@ impl Validatable<ConfigurationError> for ShardingConfig {
}
CpuAllocation::Range(start, end) => {
if start >= end {
- eprintln!(
+ error!(
"Invalid sharding configuration: cpu_allocation range
{start}..{end} is invalid (start must be less than end)"
);
return Err(ConfigurationError::InvalidConfigurationValue);
}
if *end > available_cpus {
- eprintln!(
+ error!(
"Invalid sharding configuration: cpu_allocation range
{start}..{end} exceeds available CPU cores (max: {available_cpus})"
);
return Err(ConfigurationError::InvalidConfigurationValue);
@@ -333,13 +333,13 @@ impl Validatable<ConfigurationError> for ClusterConfig {
// Validate cluster name is not empty
if self.name.trim().is_empty() {
- eprintln!("Invalid cluster configuration: cluster name cannot be
empty");
+ error!("Invalid cluster configuration: cluster name cannot be
empty");
return Err(ConfigurationError::InvalidConfigurationValue);
}
// Validate current node name is not empty
if self.node.current.name.trim().is_empty() {
- eprintln!("Invalid cluster configuration: current node name cannot
be empty");
+ error!("Invalid cluster configuration: current node name cannot be
empty");
return Err(ConfigurationError::InvalidConfigurationValue);
}
@@ -349,7 +349,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
for node in &self.node.others {
if !node_names.insert(node.name.clone()) {
- eprintln!(
+ error!(
"Invalid cluster configuration: duplicate node name '{}'
found",
node.name
);
@@ -362,13 +362,13 @@ impl Validatable<ConfigurationError> for ClusterConfig {
for node in &self.node.others {
// Validate node name is not empty
if node.name.trim().is_empty() {
- eprintln!("Invalid cluster configuration: node name cannot be
empty");
+ error!("Invalid cluster configuration: node name cannot be
empty");
return Err(ConfigurationError::InvalidConfigurationValue);
}
// Validate IP is not empty
if node.ip.trim().is_empty() {
- eprintln!(
+ error!(
"Invalid cluster configuration: IP cannot be empty for
node '{}'",
node.name
);
@@ -386,7 +386,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
for (name, port_opt) in &port_list {
if let Some(port) = port_opt {
if *port == 0 {
- eprintln!(
+ error!(
"Invalid cluster configuration: {} port cannot be
0 for node '{}'",
name, node.name
);
@@ -396,7 +396,7 @@ impl Validatable<ConfigurationError> for ClusterConfig {
// Check for port conflicts across nodes on the same IP
let endpoint = format!("{}:{}:{}", node.ip, name, port);
if !used_endpoints.insert(endpoint.clone()) {
- eprintln!(
+ error!(
"Invalid cluster configuration: port conflict -
{}:{} is already used",
node.ip, port
);
diff --git a/core/server/src/diagnostics.rs b/core/server/src/diagnostics.rs
index ea51fe85f..6029cbf88 100644
--- a/core/server/src/diagnostics.rs
+++ b/core/server/src/diagnostics.rs
@@ -55,19 +55,22 @@ pub fn print_locked_memory_limit_info() {
eprintln!(" 1. Temporarily (current session only):");
eprintln!(" ulimit -l unlimited");
eprintln!();
- eprintln!(" 2. Persistently (add to /etc/security/limits.conf):");
- eprintln!(" * soft memlock unlimited");
- eprintln!(" * hard memlock unlimited");
- eprintln!();
- eprintln!(" 3. For systemd services (add to service file):");
- eprintln!(" LimitMEMLOCK=infinity");
+ eprintln!(" 2. Docker run:");
+ eprintln!(" docker run --ulimit memlock=-1:-1 ...");
eprintln!();
- eprintln!(" 4. Docker Compose (add to service):");
+ eprintln!(" 3. Docker Compose (add to service):");
eprintln!(" ulimits:");
eprintln!(" memlock:");
eprintln!(" soft: -1");
eprintln!(" hard: -1");
eprintln!();
+ eprintln!(" 4. Persistently (add to /etc/security/limits.conf):");
+ eprintln!(" * soft memlock unlimited");
+ eprintln!(" * hard memlock unlimited");
+ eprintln!();
+ eprintln!(" 5. For systemd services (add to service file):");
+ eprintln!(" LimitMEMLOCK=infinity");
+ eprintln!();
}
/// Prints information about io_uring permission issues in containerized
environments.
diff --git a/core/server/src/log/logger.rs b/core/server/src/log/logger.rs
index e2145f25b..5646d1a51 100644
--- a/core/server/src/log/logger.rs
+++ b/core/server/src/log/logger.rs
@@ -32,9 +32,8 @@ use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::span_processor_with_async_runtime;
use std::io::{self, Write};
use std::path::PathBuf;
-use std::str::FromStr;
use std::sync::{Arc, Mutex};
-use tracing::{Level, event, info, trace};
+use tracing::{info, trace};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::field::{RecordFields, VisitOutput};
@@ -44,7 +43,7 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{
EnvFilter, Layer, Registry, filter::LevelFilter, fmt, fmt::MakeWriter,
fmt::format::Format,
- reload, reload::Handle,
+ layer::Layered, reload, reload::Handle,
};
const IGGY_LOG_FILE_PREFIX: &str = "iggy-server.log";
@@ -109,61 +108,83 @@ impl EarlyLogDumper for Logging {
}
}
-// Make reload::Layer::new more readable
-type ReloadHandle = Handle<Box<dyn Layer<Registry> + Send + Sync>, Registry>;
+// Type aliases for reload handles with different subscriber types
+type ReloadHandle<S> = Handle<Box<dyn Layer<S> + Send + Sync>, S>;
+type EnvFilterReloadHandle = Handle<EnvFilter, Registry>;
+
+// Subscriber type after applying EnvFilter to Registry
+type FilteredRegistry = Layered<reload::Layer<EnvFilter, Registry>, Registry>;
pub struct Logging {
stdout_guard: Option<WorkerGuard>,
- stdout_reload_handle: Option<ReloadHandle>,
+ stdout_reload_handle: Option<ReloadHandle<FilteredRegistry>>,
file_guard: Option<WorkerGuard>,
- file_reload_handle: Option<ReloadHandle>,
+ file_reload_handle: Option<ReloadHandle<FilteredRegistry>>,
- filtering_stdout_reload_handle: Option<ReloadHandle>,
- filtering_file_reload_handle: Option<ReloadHandle>,
+ env_filter_reload_handle: Option<EnvFilterReloadHandle>,
- early_logs_buffer: Arc<Mutex<Vec<String>>>,
+ otel_logs_reload_handle: Option<ReloadHandle<FilteredRegistry>>,
+ otel_traces_reload_handle: Option<ReloadHandle<FilteredRegistry>>,
- telemetry_config: TelemetryConfig,
+ early_logs_buffer: Arc<Mutex<Vec<String>>>,
}
impl Logging {
- pub fn new(telemetry_config: TelemetryConfig) -> Self {
+ pub fn new() -> Self {
Self {
stdout_guard: None,
stdout_reload_handle: None,
file_guard: None,
file_reload_handle: None,
- filtering_stdout_reload_handle: None,
- filtering_file_reload_handle: None,
+ env_filter_reload_handle: None,
+ otel_logs_reload_handle: None,
+ otel_traces_reload_handle: None,
early_logs_buffer: Arc::new(Mutex::new(vec![])),
- telemetry_config,
}
}
pub fn early_init(&mut self) {
- // Initialize layers
- // First layer is filtering based on severity
- // Second layer will just consume drain log entries and has first
layer as a dependency
- // Third layer will write to a safe buffer and has first layer as a
dependency
- // All layers will be replaced during late_init
- let mut layers = vec![];
-
- let filtering_level = Self::get_filtering_level(None);
- let (filtering_stdout_layer, filtering_stdout_reload_handle) =
- reload::Layer::new(filtering_level.boxed());
- self.filtering_stdout_reload_handle =
Some(filtering_stdout_reload_handle);
-
- let (filtering_file_layer, filtering_file_reload_handle) =
- reload::Layer::new(filtering_level.boxed());
- self.filtering_file_reload_handle = Some(filtering_file_reload_handle);
-
+ // Initialize layers with placeholders that will be replaced during
late_init.
+ // This allows logging to work before config is parsed.
+ //
+ // Layer structure:
+ // - EnvFilter: Applied FIRST via .with() to filter all subsequent
layers
+ // - Stdout layer: writes to NullWriter (discarded), replaced with
real stdout in late_init
+ // - File layer: writes to in-memory buffer, replaced with rolling
file in late_init
+ // - Telemetry layers: no-op placeholders (LevelFilter::OFF), replaced
if telemetry enabled
+ //
+ // EnvFilter MUST be applied first. All subsequent layers are typed
for FilteredRegistry.
+ // This ensures the filter is evaluated before any output layer
processes events.
+
+ // EnvFilter applied FIRST - wraps all subsequent layers.
+ // RUST_LOG takes precedence; otherwise defaults to INFO until
late_init reloads with config.
+ let env_filter =
+ EnvFilter::try_from_default_env().unwrap_or_else(|_|
EnvFilter::new("INFO"));
+ let (env_filter_layer, env_filter_reload_handle) =
reload::Layer::new(env_filter);
+ self.env_filter_reload_handle = Some(env_filter_reload_handle);
+
+ // All output layers are typed for FilteredRegistry (Registry +
EnvFilter)
+ let mut layers: Vec<Box<dyn Layer<FilteredRegistry> + Send + Sync>> =
vec![];
+
+ // Telemetry layers - no-op placeholders (LevelFilter::OFF) replaced
in late_init if enabled
+ let (otel_logs_layer, otel_logs_reload_handle) =
+ reload::Layer::new(LevelFilter::OFF.boxed());
+ self.otel_logs_reload_handle = Some(otel_logs_reload_handle);
+ layers.push(Box::new(otel_logs_layer));
+
+ let (otel_traces_layer, otel_traces_reload_handle) =
+ reload::Layer::new(LevelFilter::OFF.boxed());
+ self.otel_traces_reload_handle = Some(otel_traces_reload_handle);
+ layers.push(Box::new(otel_traces_layer));
+
+ // Output layers
let stdout_layer = fmt::Layer::default()
.event_format(Self::get_log_format())
.with_writer(|| NullWriter);
let (stdout_layer, stdout_layer_reload_handle) =
reload::Layer::new(stdout_layer.boxed());
self.stdout_reload_handle = Some(stdout_layer_reload_handle);
- layers.push(stdout_layer.and_then(filtering_stdout_layer));
+ layers.push(Box::new(stdout_layer));
let file_layer = fmt::Layer::default()
.event_format(Self::get_log_format())
@@ -172,34 +193,115 @@ impl Logging {
.with_ansi(false);
let (file_layer, file_layer_reload_handle) =
reload::Layer::new(file_layer.boxed());
self.file_reload_handle = Some(file_layer_reload_handle);
- layers.push(file_layer.and_then(filtering_file_layer));
-
- if !self.telemetry_config.enabled {
- // This is moment when we can start logging something and not
worry about losing it.
- Registry::default()
- .with(layers)
-
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
- .init();
- Self::print_build_info();
- return;
+ layers.push(Box::new(file_layer));
+
+ Registry::default()
+ .with(env_filter_layer)
+ .with(layers)
+ .init();
+ Self::print_build_info();
+ }
+
+ pub fn late_init(
+ &mut self,
+ base_directory: String,
+ config: &LoggingConfig,
+ telemetry_config: &TelemetryConfig,
+ ) -> Result<(), LogError> {
+ // Write to stdout and file at the same time.
+ // Use the non_blocking appender to avoid blocking the threads.
+ // Use the rolling appender to avoid having a huge log file.
+ // Make sure logs are dumped to the file during graceful shutdown.
+
+ trace!("Logging config: {}", config);
+
+ // Reload EnvFilter with config level if RUST_LOG is not set.
+ // Config level supports EnvFilter syntax (e.g.,
"warn,server=debug,iggy=trace").
+ let log_filter = if std::env::var("RUST_LOG").is_ok() {
+ // RUST_LOG takes precedence - don't reload, keep what was set in
early_init
+ std::env::var("RUST_LOG").unwrap()
+ } else {
+ // Use config level as EnvFilter directive
+ let env_filter = EnvFilter::new(&config.level);
+ self.env_filter_reload_handle
+ .as_ref()
+ .ok_or(LogError::FilterReloadFailure)?
+ .modify(|filter| *filter = env_filter)
+ .expect("Failed to modify EnvFilter");
+ config.level.clone()
+ };
+
+ // Initialize non-blocking stdout layer
+ let (non_blocking_stdout, stdout_guard) =
tracing_appender::non_blocking(std::io::stdout());
+ let stdout_layer = fmt::Layer::default()
+ .with_ansi(true)
+ .event_format(Self::get_log_format())
+ .with_writer(non_blocking_stdout)
+ .boxed();
+ self.stdout_guard = Some(stdout_guard);
+
+ self.stdout_reload_handle
+ .as_ref()
+ .ok_or(LogError::StdoutReloadFailure)?
+ .modify(|layer| *layer = stdout_layer)
+ .expect("Failed to modify stdout layer");
+
+ self.dump_to_stdout();
+
+ // Initialize directory and file for logs
+ let base_directory = PathBuf::from(base_directory);
+ let logs_subdirectory = PathBuf::from(config.path.clone());
+ let logs_path = base_directory.join(logs_subdirectory.clone());
+ let file_appender =
+ tracing_appender::rolling::hourly(logs_path.clone(),
IGGY_LOG_FILE_PREFIX);
+ let (mut non_blocking_file, file_guard) =
tracing_appender::non_blocking(file_appender);
+
+ self.dump_to_file(&mut non_blocking_file);
+
+ let file_layer = fmt::layer()
+ .event_format(Self::get_log_format())
+ .with_target(true)
+ .with_writer(non_blocking_file)
+ .with_ansi(false)
+ .fmt_fields(NoAnsiFields {})
+ .boxed();
+
+ self.file_guard = Some(file_guard);
+ self.file_reload_handle
+ .as_ref()
+ .ok_or(LogError::FileReloadFailure)?
+ .modify(|layer| *layer = file_layer)
+ .expect("Failed to modify file layer");
+
+ // Initialize telemetry if enabled
+ if telemetry_config.enabled {
+ self.init_telemetry(telemetry_config)?;
}
- let service_name = self.telemetry_config.service_name.to_owned();
+ info!(
+ "Logging initialized, logs will be stored at: {logs_path:?}. Logs
will be rotated hourly. Log filter: {log_filter}."
+ );
+
+ Ok(())
+ }
+
+ fn init_telemetry(&mut self, telemetry_config: &TelemetryConfig) ->
Result<(), LogError> {
+ let service_name = telemetry_config.service_name.to_owned();
let resource = Resource::builder()
- .with_service_name(service_name.to_owned())
+ .with_service_name(service_name.clone())
.with_attribute(KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
VERSION,
))
.build();
- let logger_provider = match self.telemetry_config.logs.transport {
+ let logger_provider = match telemetry_config.logs.transport {
TelemetryTransport::GRPC =>
opentelemetry_sdk::logs::SdkLoggerProvider::builder()
.with_resource(resource.clone())
.with_batch_exporter(
opentelemetry_otlp::LogExporter::builder()
.with_tonic()
-
.with_endpoint(self.telemetry_config.logs.endpoint.clone())
+ .with_endpoint(telemetry_config.logs.endpoint.clone())
.build()
.expect("Failed to initialize gRPC logger."),
)
@@ -208,7 +310,7 @@ impl Logging {
let log_exporter = opentelemetry_otlp::LogExporter::builder()
.with_http()
.with_http_client(reqwest::Client::new())
- .with_endpoint(self.telemetry_config.logs.endpoint.clone())
+ .with_endpoint(telemetry_config.logs.endpoint.clone())
.with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
.build()
.expect("Failed to initialize HTTP logger.");
@@ -225,13 +327,13 @@ impl Logging {
}
};
- let tracer_provider = match self.telemetry_config.traces.transport {
+ let tracer_provider = match telemetry_config.traces.transport {
TelemetryTransport::GRPC =>
opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_resource(resource.clone())
.with_batch_exporter(
opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
-
.with_endpoint(self.telemetry_config.traces.endpoint.clone())
+
.with_endpoint(telemetry_config.traces.endpoint.clone())
.build()
.expect("Failed to initialize gRPC tracer."),
)
@@ -240,7 +342,7 @@ impl Logging {
let trace_exporter =
opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_http_client(reqwest::Client::new())
-
.with_endpoint(self.telemetry_config.traces.endpoint.clone())
+ .with_endpoint(telemetry_config.traces.endpoint.clone())
.with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
.build()
.expect("Failed to initialize HTTP tracer.");
@@ -261,129 +363,26 @@ impl Logging {
global::set_tracer_provider(tracer_provider.clone());
global::set_text_map_propagator(TraceContextPropagator::new());
- Registry::default()
- .with(layers)
- .with(OpenTelemetryTracingBridge::new(&logger_provider))
- .with(OpenTelemetryLayer::new(tracer))
-
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
- .init();
- Self::print_build_info();
- }
-
- pub fn late_init(
- &mut self,
- base_directory: String,
- config: &LoggingConfig,
- ) -> Result<(), LogError> {
- // Write to stdout and file at the same time.
- // Use the non_blocking appender to avoid blocking the threads.
- // Use the rolling appender to avoid having a huge log file.
- // Make sure logs are dumped to the file during graceful shutdown.
-
- trace!("Logging config: {}", config);
-
- let filtering_level = Self::get_filtering_level(Some(config));
-
- self.filtering_stdout_reload_handle
+ // Reload telemetry layers with actual implementations
+ self.otel_logs_reload_handle
.as_ref()
.ok_or(LogError::FilterReloadFailure)?
- .modify(|layer| *layer = filtering_level.boxed())
- .expect("Failed to modify stdout filtering layer");
+ .modify(|layer| *layer =
OpenTelemetryTracingBridge::new(&logger_provider).boxed())
+ .expect("Failed to modify telemetry logs layer");
- self.filtering_file_reload_handle
+ self.otel_traces_reload_handle
.as_ref()
.ok_or(LogError::FilterReloadFailure)?
- .modify(|layer| *layer = filtering_level.boxed())
- .expect("Failed to modify file filtering layer");
-
- // Initialize non-blocking stdout layer
- let (non_blocking_stdout, stdout_guard) =
tracing_appender::non_blocking(std::io::stdout());
- let stdout_layer = fmt::Layer::default()
- .with_ansi(true)
- .event_format(Self::get_log_format())
- .with_writer(non_blocking_stdout)
- .boxed();
- self.stdout_guard = Some(stdout_guard);
+ .modify(|layer| *layer = OpenTelemetryLayer::new(tracer).boxed())
+ .expect("Failed to modify telemetry traces layer");
- self.stdout_reload_handle
- .as_ref()
- .ok_or(LogError::StdoutReloadFailure)?
- .modify(|layer| *layer = stdout_layer)
- .expect("Failed to modify stdout layer");
-
- self.dump_to_stdout();
-
- // Initialize directory and file for logs
- let base_directory = PathBuf::from(base_directory);
- let logs_subdirectory = PathBuf::from(config.path.clone());
- let logs_path = base_directory.join(logs_subdirectory.clone());
- let file_appender =
- tracing_appender::rolling::hourly(logs_path.clone(),
IGGY_LOG_FILE_PREFIX);
- let (mut non_blocking_file, file_guard) =
tracing_appender::non_blocking(file_appender);
-
- self.dump_to_file(&mut non_blocking_file);
-
- let file_layer = fmt::layer()
- .event_format(Self::get_log_format())
- .with_target(true)
- .with_writer(non_blocking_file)
- .with_ansi(false)
- .fmt_fields(NoAnsiFields {})
- .boxed();
-
- self.file_guard = Some(file_guard);
- self.file_reload_handle
- .as_ref()
- .ok_or(LogError::FileReloadFailure)?
- .modify(|layer| *layer = file_layer)
- .expect("Failed to modify file layer");
- let level = filtering_level.to_string();
-
- let print = format!(
- "Logging initialized, logs will be stored at: {logs_path:?}. Logs
will be rotated hourly. Log level is: {level}."
+ info!(
+ "Telemetry initialized with service name: {}",
+ telemetry_config.service_name
);
-
- match filtering_level {
- LevelFilter::OFF => (),
- LevelFilter::ERROR => event!(Level::ERROR, "{}", print),
- LevelFilter::WARN => event!(Level::WARN, "{}", print),
- LevelFilter::INFO => event!(Level::INFO, "{}", print),
- LevelFilter::DEBUG => event!(Level::DEBUG, "{}", print),
- LevelFilter::TRACE => event!(Level::TRACE, "{}", print),
- }
-
Ok(())
}
- // RUST_LOG always takes precedence over config
- fn get_filtering_level(config: Option<&LoggingConfig>) -> LevelFilter {
- if let Ok(rust_log) = std::env::var("RUST_LOG") {
- // Parse log level from RUST_LOG env variable
- if let Ok(level) = LevelFilter::from_str(&rust_log.to_uppercase())
{
- level
- } else {
- println!("Invalid RUST_LOG value: {rust_log}, falling back to
info");
- LevelFilter::INFO
- }
- } else {
- // Parse log level from config
- if let Some(config) = config {
- if let Ok(level) =
LevelFilter::from_str(&config.level.to_uppercase()) {
- level
- } else {
- println!(
- "Invalid log level in config: {}, falling back to
info",
- config.level
- );
- LevelFilter::INFO
- }
- } else {
- // config not provided
- LevelFilter::INFO
- }
- }
- }
-
fn get_log_format() -> Format {
Format::default().with_thread_names(true)
}
@@ -412,7 +411,7 @@ impl Logging {
impl Default for Logging {
fn default() -> Self {
- Self::new(TelemetryConfig::default())
+ Self::new()
}
}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index d7b32f015..4d9d0f08e 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -138,6 +138,11 @@ fn main() -> Result<(), ServerError> {
let is_follower = args.follower;
// FIRST DISCRETE LOADING STEP.
+ // Initialize early logging before config parsing so we can log during
bootstrap.
+ let mut logging = Logging::new();
+ logging.early_init();
+
+ // SECOND DISCRETE LOADING STEP.
// Load config and create directories.
// Remove `local_data` directory if run with `--fresh` flag.
let config = load_config().await.with_error(|error| {
@@ -146,27 +151,28 @@ fn main() -> Result<(), ServerError> {
if args.fresh {
let system_path = config.system.get_system_path();
if compio::fs::metadata(&system_path).await.is_ok() {
- println!(
+ warn!(
"Removing system path at: {} because `--fresh` flag was
set",
system_path
);
if let Err(e) = fs_utils::remove_dir_all(&system_path).await {
- eprintln!("Failed to remove system path at {}: {}",
system_path, e);
+ warn!("Failed to remove system path at {system_path}:
{e}");
}
}
}
- // SECOND DISCRETE LOADING STEP.
+ // THIRD DISCRETE LOADING STEP.
// Create directories.
create_directories(&config.system).await?;
- // Initialize logging
- // THIRD DISCRETE LOADING STEP.
- let mut logging = Logging::new(config.telemetry.clone());
- logging.early_init();
-
- // From this point on, we can use tracing macros to log messages.
- logging.late_init(config.system.get_system_path(),
&config.system.logging)?;
+ // FOURTH DISCRETE LOADING STEP.
+ // Complete logging setup with config (file output, telemetry).
+ // From this point on, logs are persisted to file and telemetry is
active.
+ logging.late_init(
+ config.system.get_system_path(),
+ &config.system.logging,
+ &config.telemetry,
+ )?;
if is_follower {
info!("Server is running in FOLLOWER mode for testing leader
redirection");
@@ -198,7 +204,7 @@ fn main() -> Result<(), ServerError> {
}
}
- // FOURTH DISCRETE LOADING STEP.
+ // FIFTH DISCRETE LOADING STEP.
MemoryPool::init_pool(&config.system.memory_pool.into_other());
// SIXTH DISCRETE LOADING STEP.