This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_test
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/connectors_test by this push:
     new 785266c1 Common config loader WiP
785266c1 is described below

commit 785266c1cabaee76def39e0dae437531202939e3
Author: spetz <[email protected]>
AuthorDate: Sun Sep 21 23:16:33 2025 +0200

    Common config loader WiP
---
 Cargo.lock                                         |   4 +
 Cargo.toml                                         |   1 +
 DEPENDENCIES.md                                    |  22 +++
 core/common/Cargo.toml                             |   2 +
 .../src/configs/mod.rs}                            | 184 +++++++++------------
 core/common/src/lib.rs                             |   3 +
 core/connectors/runtime/Cargo.toml                 |   2 +
 core/connectors/runtime/config.toml                |   6 +-
 core/connectors/runtime/src/configs.rs             |  54 +++++-
 core/connectors/runtime/src/context.rs             |   2 +-
 core/connectors/runtime/src/main.rs                |  16 +-
 core/connectors/sinks/postgres_sink/src/lib.rs     |   5 +
 core/integration/src/test_connectors_runtime.rs    |  13 +-
 core/integration/src/test_server.rs                |  16 +-
 core/integration/tests/config_provider/mod.rs      |  39 +++--
 core/integration/tests/connectors/mod.rs           |  21 +--
 .../tests/connectors/{sinks => }/postgres/mod.rs   |  16 +-
 .../connectors/{sinks => }/postgres/postgres.toml  |   0
 .../postgres/mod.rs => postgres/postgres_sink.rs}  |   0
 .../mod.rs => postgres/postgres_source.rs}         |   0
 core/integration/tests/connectors/sinks/mod.rs     |  19 ---
 core/integration/tests/connectors/sources/mod.rs   |  19 ---
 core/server/Cargo.toml                             |   2 +-
 core/server/src/configs/mod.rs                     |   1 -
 core/server/src/configs/server.rs                  |  85 +++++++++-
 core/server/src/main.rs                            |   6 +-
 26 files changed, 324 insertions(+), 214 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 75de1710..192783f6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4043,9 +4043,11 @@ dependencies = [
  "dlopen2",
  "dotenvy",
  "figlet-rs",
+ "figment",
  "flume",
  "futures",
  "iggy",
+ "iggy_common",
  "iggy_connector_sdk",
  "mimalloc",
  "once_cell",
@@ -4120,6 +4122,7 @@ dependencies = [
  "crc32fast",
  "derive_more 2.0.1",
  "fast-async-mutex",
+ "figment",
  "humantime",
  "rcgen",
  "rustls",
@@ -4129,6 +4132,7 @@ dependencies = [
  "strum",
  "thiserror 2.0.16",
  "tokio",
+ "toml 0.9.6",
  "tracing",
 ]
 
diff --git a/Cargo.toml b/Cargo.toml
index 1ab0abee..1f60b229 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -96,6 +96,7 @@ dotenvy = "0.15.7"
 enum_dispatch = "0.3.13"
 env_logger = "0.11.8"
 figlet-rs = "0.1.5"
+figment = { version = "0.10.19", features = ["toml", "env"] }
 flume = "0.11.1"
 futures = "0.3.31"
 futures-util = "0.3.31"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 34a7bc6d..e370afda 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -84,6 +84,9 @@ bitvec: 1.0.1, "MIT",
 blake2: 0.10.6, "Apache-2.0 OR MIT",
 blake3: 1.8.2, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR CC0-1.0",
 block-buffer: 0.10.4, "Apache-2.0 OR MIT",
+bollard: 0.19.1, "Apache-2.0",
+bollard-buildkit-proto: 0.6.1, "Apache-2.0",
+bollard-stubs: 1.48.3-rc.28.0.4, "Apache-2.0",
 bon: 3.7.2, "Apache-2.0 OR MIT",
 bon-macros: 3.7.2, "Apache-2.0 OR MIT",
 boolinator: 2.4.0, "Apache-2.0 OR MIT",
@@ -196,6 +199,7 @@ dlopen2: 0.8.0, "Custom License File",
 dlopen2_derive: 0.4.1, "Custom License File",
 dlv-list: 0.5.2, "Apache-2.0 OR MIT",
 doc-comment: 0.3.3, "MIT",
+docker_credential: 1.3.2, "Apache-2.0 OR MIT",
 document-features: 0.2.11, "Apache-2.0 OR MIT",
 dotenvy: 0.15.7, "MIT",
 downcast: 0.11.0, "MIT",
@@ -220,6 +224,7 @@ errno: 0.3.14, "Apache-2.0 OR MIT",
 error_set: 0.8.5, "Apache-2.0",
 error_set_impl: 0.8.5, "Apache-2.0",
 etcetera: 0.8.0, "Apache-2.0 OR MIT",
+etcetera: 0.10.0, "Apache-2.0 OR MIT",
 event-listener: 5.4.1, "Apache-2.0 OR MIT",
 event-listener-strategy: 0.5.4, "Apache-2.0 OR MIT",
 ext-trait: 1.0.1, "Apache-2.0 OR MIT OR Zlib",
@@ -231,6 +236,7 @@ fastrand: 2.3.0, "Apache-2.0 OR MIT",
 figlet-rs: 0.1.5, "Apache-2.0",
 figment: 0.10.19, "Apache-2.0 OR MIT",
 file-operation: 0.8.4, "MIT",
+filetime: 0.2.26, "Apache-2.0 OR MIT",
 find-msvc-tools: 0.1.1, "Apache-2.0 OR MIT",
 flatbuffers: 25.2.10, "Apache-2.0",
 flate2: 1.1.2, "Apache-2.0 OR MIT",
@@ -327,9 +333,11 @@ httpdate: 1.0.3, "Apache-2.0 OR MIT",
 human-repr: 1.1.0, "MIT",
 humantime: 2.3.0, "Apache-2.0 OR MIT",
 hyper: 1.7.0, "MIT",
+hyper-named-pipe: 0.1.0, "Apache-2.0",
 hyper-rustls: 0.27.7, "Apache-2.0 OR ISC OR MIT",
 hyper-timeout: 0.5.2, "Apache-2.0 OR MIT",
 hyper-util: 0.1.17, "MIT",
+hyperlocal: 0.9.1, "MIT",
 iana-time-zone: 0.1.64, "Apache-2.0 OR MIT",
 iana-time-zone-haiku: 0.1.2, "Apache-2.0 OR MIT",
 icu_collections: 2.0.0, "Unicode-3.0",
@@ -460,13 +468,16 @@ nougat: 0.2.4, "Apache-2.0 OR MIT OR Zlib",
 nougat-proc_macros: 0.2.4, "Apache-2.0 OR MIT OR Zlib",
 ntapi: 0.4.1, "Apache-2.0 OR MIT",
 nu-ansi-term: 0.50.1, "MIT",
+num: 0.4.3, "Apache-2.0 OR MIT",
 num-bigint: 0.4.6, "Apache-2.0 OR MIT",
 num-bigint-dig: 0.8.4, "Apache-2.0 OR MIT",
+num-complex: 0.4.6, "Apache-2.0 OR MIT",
 num-conv: 0.1.0, "Apache-2.0 OR MIT",
 num-integer: 0.1.46, "Apache-2.0 OR MIT",
 num-iter: 0.1.45, "Apache-2.0 OR MIT",
 num-modular: 0.6.1, "Apache-2.0",
 num-order: 1.2.0, "Apache-2.0",
+num-rational: 0.4.2, "Apache-2.0 OR MIT",
 num-traits: 0.2.19, "Apache-2.0 OR MIT",
 num_cpus: 1.17.0, "Apache-2.0 OR MIT",
 num_threads: 0.1.7, "Apache-2.0 OR MIT",
@@ -496,6 +507,8 @@ parking_lot: 0.11.2, "Apache-2.0 OR MIT",
 parking_lot: 0.12.4, "Apache-2.0 OR MIT",
 parking_lot_core: 0.8.6, "Apache-2.0 OR MIT",
 parking_lot_core: 0.9.11, "Apache-2.0 OR MIT",
+parse-display: 0.9.1, "Apache-2.0 OR MIT",
+parse-display-derive: 0.9.1, "Apache-2.0 OR MIT",
 passterm: 2.0.1, "BSD-3-Clause",
 password-hash: 0.5.0, "Apache-2.0 OR MIT",
 paste: 1.0.15, "Apache-2.0 OR MIT",
@@ -573,6 +586,7 @@ rayon: 1.11.0, "Apache-2.0 OR MIT",
 rayon-core: 1.13.0, "Apache-2.0 OR MIT",
 rcgen: 0.14.4, "Apache-2.0 OR MIT",
 redox_syscall: 0.2.16, "MIT",
+redox_syscall: 0.3.5, "MIT",
 redox_syscall: 0.5.17, "MIT",
 redox_users: 0.5.2, "MIT",
 ref-cast: 1.0.24, "Apache-2.0 OR MIT",
@@ -637,6 +651,7 @@ serde_derive: 1.0.225, "Apache-2.0 OR MIT",
 serde_derive_internals: 0.29.1, "Apache-2.0 OR MIT",
 serde_json: 1.0.145, "Apache-2.0 OR MIT",
 serde_path_to_error: 0.1.20, "Apache-2.0 OR MIT",
+serde_repr: 0.1.20, "Apache-2.0 OR MIT",
 serde_spanned: 0.6.9, "Apache-2.0 OR MIT",
 serde_spanned: 1.0.1, "Apache-2.0 OR MIT",
 serde_urlencoded: 0.7.1, "Apache-2.0 OR MIT",
@@ -681,6 +696,8 @@ static-toml: 1.3.0, "MIT",
 static_assertions: 1.1.0, "Apache-2.0 OR MIT",
 stringprep: 0.1.5, "Apache-2.0 OR MIT",
 strsim: 0.11.1, "MIT",
+structmeta: 0.3.0, "Apache-2.0 OR MIT",
+structmeta-derive: 0.3.0, "Apache-2.0 OR MIT",
 strum: 0.27.2, "MIT",
 strum_macros: 0.27.2, "MIT",
 subtle: 2.6.1, "BSD-3-Clause",
@@ -701,6 +718,8 @@ termtree: 0.5.1, "MIT",
 test-case: 3.3.1, "MIT",
 test-case-core: 3.3.1, "MIT",
 test-case-macros: 3.3.1, "MIT",
+testcontainers: 0.25.0, "Apache-2.0 OR MIT",
+testcontainers-modules: 0.13.0, "MIT",
 textwrap: 0.16.2, "MIT",
 thiserror: 1.0.69, "Apache-2.0 OR MIT",
 thiserror: 2.0.16, "Apache-2.0 OR MIT",
@@ -718,6 +737,7 @@ tokio: 1.47.1, "MIT",
 tokio-macros: 2.5.0, "MIT",
 tokio-rustls: 0.26.3, "Apache-2.0 OR MIT",
 tokio-stream: 0.1.17, "MIT",
+tokio-tar: 0.3.1, "Apache-2.0 OR MIT",
 tokio-util: 0.7.16, "MIT",
 toml: 0.8.23, "Apache-2.0 OR MIT",
 toml: 0.9.6, "Apache-2.0 OR MIT",
@@ -767,6 +787,7 @@ unicode-xid: 0.2.6, "Apache-2.0 OR MIT",
 universal-hash: 0.5.1, "Apache-2.0 OR MIT",
 untrusted: 0.9.0, "ISC",
 unty: 0.0.4, "Apache-2.0 OR MIT",
+ureq: 2.12.1, "Apache-2.0 OR MIT",
 url: 2.5.7, "Apache-2.0 OR MIT",
 urlencoding: 2.1.3, "MIT",
 utf8-width: 0.1.7, "MIT",
@@ -872,6 +893,7 @@ winnow: 0.7.13, "MIT",
 wit-bindgen: 0.46.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
 writeable: 0.6.1, "Unicode-3.0",
 wyz: 0.5.1, "MIT",
+xattr: 1.5.1, "Apache-2.0 OR MIT",
 yaml-rust2: 0.10.4, "Apache-2.0 OR MIT",
 yansi: 1.0.1, "Apache-2.0 OR MIT",
 yasna: 0.5.2, "Apache-2.0 OR MIT",
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index 48affb57..edff331e 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -45,6 +45,7 @@ comfy-table = { workspace = true }
 crc32fast = { workspace = true }
 derive_more = { workspace = true }
 fast-async-mutex = { version = "0.6.7", optional = true }
+figment = { workspace = true }
 humantime = { workspace = true }
 rcgen = "0.14.4"
 rustls = { workspace = true }
@@ -54,4 +55,5 @@ serde_with = { workspace = true, features = ["base64"] }
 strum = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
+toml = { workspace = true }
 tracing = { workspace = true }
diff --git a/core/server/src/configs/config_provider.rs 
b/core/common/src/configs/mod.rs
similarity index 80%
rename from core/server/src/configs/config_provider.rs
rename to core/common/src/configs/mod.rs
index b77ec4bd..c4cb8584 100644
--- a/core/server/src/configs/config_provider.rs
+++ b/core/common/src/configs/mod.rs
@@ -16,64 +16,92 @@
  * under the License.
  */
 
-use crate::IGGY_ROOT_PASSWORD_ENV;
-use crate::configs::server::ServerConfig;
-use crate::server_error::ConfigError;
 use figment::{
-    Error, Figment, Metadata, Profile, Provider,
-    providers::{Format, Toml},
+    Figment, Profile, Provider,
+    providers::{Data, Format, Toml},
     value::{Dict, Map as FigmentMap, Tag, Value as FigmentValue},
 };
-use std::{env, future::Future, path::Path};
+use serde::{Serialize, de::DeserializeOwned};
+use std::{env, fmt::Display, future::Future, marker::PhantomData, path::Path};
 use toml::{Value as TomlValue, map::Map};
 
-const DEFAULT_CONFIG_PROVIDER: &str = "file";
-const DEFAULT_CONFIG_PATH: &str = "configs/server.toml";
-const SECRET_KEYS: [&str; 6] = [
-    IGGY_ROOT_PASSWORD_ENV,
-    "IGGY_DATA_MAINTENANCE_ARCHIVER_S3_KEY_SECRET",
-    "IGGY_HTTP_JWT_ENCODING_SECRET",
-    "IGGY_HTTP_JWT_DECODING_SECRET",
-    "IGGY_TCP_TLS_PASSWORD",
-    "IGGY_SYSTEM_ENCRYPTION_KEY",
-];
-
-pub enum ConfigProviderKind {
-    File(FileConfigProvider),
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum ConfigurationError {
+    CannotLoadConfiguration,
 }
 
-impl ConfigProviderKind {
-    pub async fn load_config(&self) -> Result<ServerConfig, ConfigError> {
-        match self {
-            Self::File(p) => p.load_config().await,
-        }
-    }
-}
-
-pub trait ConfigProvider {
-    fn load_config(&self) -> impl Future<Output = Result<ServerConfig, 
ConfigError>>;
+pub trait ConfigProvider<T: Serialize + DeserializeOwned + Default + Display> {
+    fn load_config(self) -> impl Future<Output = Result<T, 
ConfigurationError>>;
 }
 
-#[derive(Debug)]
-pub struct FileConfigProvider {
+pub struct FileConfigProvider<T: Provider> {
     path: String,
+    default_config: Data<Toml>,
+    env_provider: T,
 }
 
-pub struct CustomEnvProvider {
-    prefix: String,
+impl<T: Provider> FileConfigProvider<T> {
+    pub fn new(path: String, default_config: Data<Toml>, env_provider: T) -> 
Self {
+        Self {
+            path,
+            default_config,
+            env_provider,
+        }
+    }
 }
 
-impl FileConfigProvider {
-    pub fn new(path: String) -> Self {
-        Self { path }
-    }
+#[derive(Debug, Clone)]
+pub struct CustomEnvProvider<T: Serialize + DeserializeOwned + Default + 
Display> {
+    prefix: String,
+    secret_keys: Vec<String>,
+    _data: PhantomData<T>,
 }
 
-impl CustomEnvProvider {
-    pub fn new(prefix: &str) -> Self {
+impl<T: Serialize + DeserializeOwned + Default + Display> CustomEnvProvider<T> 
{
+    pub fn new(prefix: &str, secret_keys: &[&str]) -> Self {
         Self {
             prefix: prefix.to_string(),
+            secret_keys: secret_keys.iter().map(|s| s.to_string()).collect(),
+            _data: PhantomData,
+        }
+    }
+
+    pub fn deserialize(&self) -> Result<FigmentMap<Profile, Dict>, 
ConfigurationError> {
+        let default_config = toml::to_string(&T::default())
+            .expect("Cannot serialize default Config. Something's terribly 
wrong.");
+        let toml_value: TomlValue = toml::from_str(&default_config)
+            .expect("Cannot parse default Config. Something's terribly 
wrong.");
+        let mut source_dict = Dict::new();
+        if let TomlValue::Table(table) = toml_value {
+            Self::walk_toml_table_to_dict("", table, &mut source_dict);
+        }
+
+        let mut new_dict = Dict::new();
+        for (key, mut value) in env::vars() {
+            let env_key = key.to_uppercase();
+            if !env_key.starts_with(self.prefix.as_str()) {
+                continue;
+            }
+            let keys: Vec<String> = env_key[self.prefix.len()..]
+                .split('_')
+                .map(|k| k.to_lowercase())
+                .collect();
+            let env_var_value = Self::try_parse_value(&value);
+            if self.secret_keys.contains(&env_key) {
+                value = "******".to_string();
+            }
+
+            println!("{env_key} value changed to: {value} from environment 
variable");
+            Self::insert_overridden_values_from_env(
+                &source_dict,
+                &mut new_dict,
+                keys.clone(),
+                env_var_value.clone(),
+            );
         }
+        let mut data = FigmentMap::new();
+        data.insert(Profile::default(), new_dict);
+        Ok(data)
     }
 
     fn walk_toml_table_to_dict(prefix: &str, table: Map<String, TomlValue>, 
dict: &mut Dict) {
@@ -316,63 +344,6 @@ impl CustomEnvProvider {
     }
 }
 
-impl Provider for CustomEnvProvider {
-    fn metadata(&self) -> Metadata {
-        Metadata::named("iggy-server config")
-    }
-
-    fn data(&self) -> Result<FigmentMap<Profile, Dict>, Error> {
-        let default_config = toml::to_string(&ServerConfig::default())
-            .expect("Cannot serialize default ServerConfig. Something's 
terribly wrong.");
-        let toml_value: TomlValue = toml::from_str(&default_config).unwrap();
-        let mut source_dict = Dict::new();
-        if let TomlValue::Table(table) = toml_value {
-            Self::walk_toml_table_to_dict("", table, &mut source_dict);
-        }
-
-        let mut new_dict = Dict::new();
-        for (key, mut value) in env::vars() {
-            let env_key = key.to_uppercase();
-            if !env_key.starts_with(self.prefix.as_str()) {
-                continue;
-            }
-            let keys: Vec<String> = env_key[self.prefix.len()..]
-                .split('_')
-                .map(|k| k.to_lowercase())
-                .collect();
-            let env_var_value = Self::try_parse_value(&value);
-            if SECRET_KEYS.contains(&env_key.as_str()) {
-                value = "******".to_string();
-            }
-
-            println!("{env_key} value changed to: {value} from environment 
variable");
-            Self::insert_overridden_values_from_env(
-                &source_dict,
-                &mut new_dict,
-                keys.clone(),
-                env_var_value.clone(),
-            );
-        }
-        let mut data = FigmentMap::new();
-        data.insert(Profile::default(), new_dict);
-
-        Ok(data)
-    }
-}
-
-pub fn resolve(config_provider_type: &str) -> Result<ConfigProviderKind, 
ConfigError> {
-    match config_provider_type {
-        DEFAULT_CONFIG_PROVIDER => {
-            let path =
-                env::var("IGGY_CONFIG_PATH").unwrap_or_else(|_| 
DEFAULT_CONFIG_PATH.to_string());
-            Ok(ConfigProviderKind::File(FileConfigProvider::new(path)))
-        }
-        _ => Err(ConfigError::InvalidConfigurationProvider {
-            provider_type: config_provider_type.to_string(),
-        }),
-    }
-}
-
 /// This does exactly the same as Figment does internally.
 fn file_exists<P: AsRef<Path>>(path: P) -> bool {
     let path = path.as_ref();
@@ -400,32 +371,31 @@ fn file_exists<P: AsRef<Path>>(path: P) -> bool {
     }
 }
 
-impl ConfigProvider for FileConfigProvider {
-    async fn load_config(&self) -> Result<ServerConfig, ConfigError> {
+impl<T: Serialize + DeserializeOwned + Default + Display, P: Provider + Clone> 
ConfigProvider<T>
+    for FileConfigProvider<P>
+{
+    async fn load_config(self) -> Result<T, ConfigurationError> {
         println!("Loading config from path: '{}'...", self.path);
 
-        // Include the default configuration from server.toml
-        let embedded_default_config = 
Toml::string(include_str!("../../../configs/server.toml"));
-
         // Start with the default configuration
-        let mut config_builder = Figment::new().merge(embedded_default_config);
+        let mut config_builder = Figment::new().merge(self.default_config);
 
-        // If the server.toml file exists, merge it into the configuration
+        // If the config file exists, merge it into the configuration
         if file_exists(&self.path) {
             println!("Found configuration file at path: '{}'.", self.path);
             config_builder = config_builder.merge(Toml::file(&self.path));
         } else {
             println!(
-                "Configuration file not found at path: '{}'. Using default 
configuration from embedded server.toml.",
+                "Configuration file not found at path: '{}'. Using default 
configuration from embedded config",
                 self.path
             );
         }
 
         // Merge environment variables into the configuration
-        config_builder = config_builder.merge(CustomEnvProvider::new("IGGY_"));
+        config_builder = config_builder.merge(self.env_provider);
 
         // Finally, attempt to extract the final configuration
-        let config_result: Result<ServerConfig, figment::Error> = 
config_builder.extract();
+        let config_result: Result<T, figment::Error> = 
config_builder.extract();
 
         match config_result {
             Ok(config) => {
@@ -435,7 +405,7 @@ impl ConfigProvider for FileConfigProvider {
             }
             Err(e) => {
                 println!("Failed to load config: {e}");
-                Err(ConfigError::CannotLoadConfiguration)
+                Err(ConfigurationError::CannotLoadConfiguration)
             }
         }
     }
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 4c527a06..ecc5478f 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -17,6 +17,7 @@
 
 mod certificates;
 mod commands;
+mod configs;
 mod error;
 mod traits;
 mod types;
@@ -39,6 +40,8 @@ pub use commands::system::get_cluster_metadata::*;
 pub use commands::system::*;
 pub use commands::topics::*;
 pub use commands::users::*;
+// Configs
+pub use configs::*;
 // Traits
 pub use traits::bytes_serializable::BytesSerializable;
 pub use traits::partitioner::Partitioner;
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
index 168deb32..46e1bb60 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -36,9 +36,11 @@ dashmap = { workspace = true }
 dlopen2 = { workspace = true }
 dotenvy = { workspace = true }
 figlet-rs = { workspace = true }
+figment = { workspace = true }
 flume = { workspace = true }
 futures = { workspace = true }
 iggy = { workspace = true }
+iggy_common = { workspace = true }
 iggy_connector_sdk = { workspace = true }
 mimalloc = { workspace = true }
 once_cell = { workspace = true }
diff --git a/core/connectors/runtime/config.toml 
b/core/connectors/runtime/config.toml
index 3e00608c..ea868f8e 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -15,12 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[http] # Optional HTTP API configuration
+[http_api] # Optional HTTP API configuration
 enabled = true
 address = "127.0.0.1:8081"
 # api_key = "secret" # Optional API key for authentication to be passed as 
`api-key` header
 
-[http.cors] # Optional CORS configuration for HTTP API
+[http_api.cors] # Optional CORS configuration for HTTP API
 enabled = false
 allowed_methods = ["GET", "POST", "PUT", "DELETE"]
 allowed_origins = ["*"]
@@ -29,7 +29,7 @@ exposed_headers = [""]
 allow_credentials = false
 allow_private_network = false
 
-[http.tls] # Optional TLS configuration for HTTP API
+[http_api.tls] # Optional TLS configuration for HTTP API
 enabled = false
 cert_file = "core/certs/iggy_cert.pem"
 key_file = "core/certs/iggy_key.pem"
diff --git a/core/connectors/runtime/src/configs.rs 
b/core/connectors/runtime/src/configs.rs
index 19a2b4dc..c7a5b41f 100644
--- a/core/connectors/runtime/src/configs.rs
+++ b/core/connectors/runtime/src/configs.rs
@@ -17,7 +17,13 @@
  */
 
 use crate::api::config::HttpApiConfig;
+use figment::{
+    Metadata, Profile, Provider,
+    providers::{Format, Toml},
+    value::Dict,
+};
 use iggy::prelude::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
+use iggy_common::{CustomEnvProvider, FileConfigProvider};
 use iggy_connector_sdk::{Schema, transforms::TransformType};
 use serde::{Deserialize, Serialize};
 use std::collections::HashMap;
@@ -42,7 +48,7 @@ pub enum ConfigFormat {
 #[derive(Debug, Default, Clone, Deserialize, Serialize)]
 #[serde(default)]
 pub struct RuntimeConfig {
-    pub http: HttpApiConfig,
+    pub http_api: HttpApiConfig,
     pub iggy: IggyConfig,
     pub sinks: HashMap<String, SinkConfig>,
     pub sources: HashMap<String, SourceConfig>,
@@ -114,6 +120,20 @@ pub struct StateConfig {
     pub path: String,
 }
 
+impl std::fmt::Display for RuntimeConfig {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "RuntimeConfig {{ http: {:?}, iggy: {:?}, sinks: {:?}, sources: 
{:?}, state: {:?} }}",
+            self.http_api,
+            self.iggy,
+            self.sinks.keys(),
+            self.sources.keys(),
+            self.state
+        )
+    }
+}
+
 impl Default for StateConfig {
     fn default() -> Self {
         Self {
@@ -144,3 +164,35 @@ impl Default for HttpApiConfig {
         }
     }
 }
+
+impl RuntimeConfig {
+    pub fn config_provider(path: String) -> 
FileConfigProvider<ConnectorsEnvProvider> {
+        let default_config = 
Toml::string(include_str!("../../../connectors/runtime/config.toml"));
+        FileConfigProvider::new(path, default_config, 
ConnectorsEnvProvider::default())
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct ConnectorsEnvProvider {
+    provider: CustomEnvProvider<RuntimeConfig>,
+}
+
+impl Default for ConnectorsEnvProvider {
+    fn default() -> Self {
+        Self {
+            provider: CustomEnvProvider::new("IGGY_CONNECTORS_", &[]),
+        }
+    }
+}
+
+impl Provider for ConnectorsEnvProvider {
+    fn metadata(&self) -> Metadata {
+        Metadata::named("iggy-connectors-config")
+    }
+
+    fn data(&self) -> Result<figment::value::Map<Profile, Dict>, 
figment::Error> {
+        self.provider.deserialize().map_err(|_| {
+            figment::Error::from("Cannot deserialize environment variables for 
connectors config")
+        })
+    }
+}
diff --git a/core/connectors/runtime/src/context.rs 
b/core/connectors/runtime/src/context.rs
index d7dea9f8..6dd005b6 100644
--- a/core/connectors/runtime/src/context.rs
+++ b/core/connectors/runtime/src/context.rs
@@ -40,7 +40,7 @@ pub fn init(
     RuntimeContext {
         sinks: SinkManager::new(map_sinks(config, sink_wrappers)),
         sources: SourceManager::new(map_sources(config, source_wrappers)),
-        api_key: config.http.api_key.clone(),
+        api_key: config.http_api.api_key.clone(),
     }
 }
 
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index 4ca3369e..bb615ebe 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -16,13 +16,13 @@
  * under the License.
  */
 
-use config::{Config, Environment, File};
 use configs::{ConfigFormat, RuntimeConfig};
 use dlopen2::wrapper::{Container, WrapperApi};
 use dotenvy::dotenv;
 use error::RuntimeError;
 use figlet_rs::FIGfont;
 use iggy::prelude::{Client, IggyConsumer, IggyProducer};
+use iggy_common::ConfigProvider;
 use iggy_connector_sdk::{
     StreamDecoder, StreamEncoder,
     sink::ConsumeCallback,
@@ -124,14 +124,10 @@ async fn main() -> Result<(), RuntimeError> {
     let config_path = config_path.unwrap_or_else(|_| "config".to_string());
     info!("Starting Iggy Connectors Runtime, loading configuration from: 
{config_path}...");
 
-    let config: RuntimeConfig = Config::builder()
-        .add_source(Config::try_from(&RuntimeConfig::default()).expect("Failed 
to init config"))
-        .add_source(File::with_name(&config_path).required(false))
-        .add_source(Environment::with_prefix("IGGY_CONNECTORS").separator("_"))
-        .build()
-        .expect("Failed to build runtime config")
-        .try_deserialize()
-        .expect("Failed to deserialize runtime config");
+    let config: RuntimeConfig = RuntimeConfig::config_provider(config_path)
+        .load_config()
+        .await
+        .expect("Failed to load configuration");
 
     std::fs::create_dir_all(&config.state.path).expect("Failed to create state 
directory");
 
@@ -182,7 +178,7 @@ async fn main() -> Result<(), RuntimeError> {
 
     let context = context::init(&config, &sink_wrappers, &source_wrappers);
     let context = Arc::new(context);
-    api::init(&config.http, context).await;
+    api::init(&config.http_api, context).await;
 
     source::handle(source_wrappers);
     sink::consume(sink_wrappers);
diff --git a/core/connectors/sinks/postgres_sink/src/lib.rs 
b/core/connectors/sinks/postgres_sink/src/lib.rs
index 2ef6eb6b..0ea6cb7a 100644
--- a/core/connectors/sinks/postgres_sink/src/lib.rs
+++ b/core/connectors/sinks/postgres_sink/src/lib.rs
@@ -70,11 +70,16 @@ impl PostgresSink {
     async fn connect(&mut self) -> Result<(), Error> {
         let max_connections = self.config.max_connections.unwrap_or(10);
 
+        info!(
+            "Connecting to PostgreSQL database with max {} connections, 
connection string: {}",
+            max_connections, self.config.connection_string
+        );
         let pool = PgPoolOptions::new()
             .max_connections(max_connections)
             .connect(&self.config.connection_string)
             .await
             .map_err(|e| Error::InitError(format!("Failed to connect to 
PostgreSQL: {e}")))?;
+        info!("Connected to PostgreSQL database");
 
         sqlx::query("SELECT 1")
             .execute(&pool)
diff --git a/core/integration/src/test_connectors_runtime.rs 
b/core/integration/src/test_connectors_runtime.rs
index d28f67a7..9a203fc3 100644
--- a/core/integration/src/test_connectors_runtime.rs
+++ b/core/integration/src/test_connectors_runtime.rs
@@ -26,7 +26,11 @@ use std::process::{Child, Command};
 use std::time::Duration;
 use std::{collections::HashMap, net::TcpListener};
 use tokio::time::sleep;
+use uuid::Uuid;
 
+use crate::bench_utils::get_random_path;
+
+const LOCAL_STATE_PREFIX: &str = "local_state_";
 pub const CONSUMER_NAME: &str = "connectors";
 
 #[derive(Debug)]
@@ -67,6 +71,7 @@ impl TestConnectorsRuntime {
             "IGGY_CONNECTORS_IGGY_CONSUMER".to_string(),
             CONSUMER_NAME.to_string(),
         );
+        envs.insert("IGGY_CONNECTORS_STATE_PATH".to_string(), 
get_random_path());
         Self::create(envs, server_executable_path)
     }
 
@@ -85,7 +90,7 @@ impl TestConnectorsRuntime {
 
     pub fn start(&mut self) {
         self.envs
-            .entry("IGGY_CONNECTORS_HTTP_ADDRESS".to_string())
+            .entry("IGGY_CONNECTORS_HTTP_API_ADDRESS".to_string())
             .or_insert(self.server_address.to_string());
         let mut command = if let Some(server_executable_path) = 
&self.server_executable_path {
             Command::new(server_executable_path)
@@ -146,6 +151,10 @@ impl TestConnectorsRuntime {
         self.child_handle.as_ref().unwrap().id()
     }
 
+    pub fn get_random_path() -> String {
+        format!("{}{}", LOCAL_STATE_PREFIX, Uuid::now_v7().to_u128_le())
+    }
+
     fn get_http_api_address(&self) -> String {
         format!(
             "http://{}:{}";,
@@ -157,7 +166,7 @@ impl TestConnectorsRuntime {
     pub async fn ensure_started(&self) {
         let http_api_address = self.get_http_api_address();
         let client = reqwest::Client::new();
-        let max_retries = 3000;
+        let max_retries = 1000;
         let mut retries = 0;
         while let Err(error) = client.get(&http_api_address).send().await {
             sleep(Duration::from_millis(20)).await;
diff --git a/core/integration/src/test_server.rs 
b/core/integration/src/test_server.rs
index 2678eaee..f09c9369 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -30,12 +30,11 @@ use assert_cmd::prelude::CommandCargoExt;
 use async_trait::async_trait;
 use derive_more::Display;
 use futures::executor::block_on;
-use iggy_common::TransportProtocol;
-use uuid::Uuid;
-
 use iggy::prelude::UserStatus::Active;
 use iggy::prelude::*;
-use server::configs::config_provider::{ConfigProvider, FileConfigProvider};
+use iggy_common::{ConfigProvider, TransportProtocol};
+use server::configs::server::ServerConfig;
+use uuid::Uuid;
 
 pub const SYSTEM_PATH_ENV_VAR: &str = "IGGY_SYSTEM_PATH";
 pub const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE";
@@ -310,13 +309,11 @@ impl TestServer {
 
     fn wait_until_server_has_bound(&mut self) {
         let config_path = format!("{}/runtime/current_config.toml", 
self.local_data_path);
-        let file_config_provider = 
FileConfigProvider::new(config_path.clone());
-
         let max_attempts = (MAX_PORT_WAIT_DURATION_S * 1000) / 
SLEEP_INTERVAL_MS;
         self.server_addrs.clear();
 
         let config = block_on(async {
-            let mut loaded_config = None;
+            let mut loaded_config: Option<ServerConfig> = None;
 
             for _ in 0..max_attempts {
                 if !Path::new(&config_path).exists() {
@@ -328,7 +325,10 @@ impl TestServer {
                     sleep(Duration::from_millis(SLEEP_INTERVAL_MS));
                     continue;
                 }
-                match file_config_provider.load_config().await {
+                match ServerConfig::config_provider(config_path.clone())
+                    .load_config()
+                    .await
+                {
                     Ok(config) => {
                         loaded_config = Some(config);
                         break;
diff --git a/core/integration/tests/config_provider/mod.rs 
b/core/integration/tests/config_provider/mod.rs
index 5426807d..79de9689 100644
--- a/core/integration/tests/config_provider/mod.rs
+++ b/core/integration/tests/config_provider/mod.rs
@@ -16,22 +16,25 @@
  * under the License.
  */
 
+use iggy_common::{ConfigProvider, ConfigurationError, FileConfigProvider};
 use integration::file::{file_exists, get_root_path};
 use serial_test::serial;
-use server::configs::config_provider::{ConfigProvider, FileConfigProvider};
+use server::configs::server::{ServerConfig, ServerEnvProvider};
 use std::env;
 
 async fn scenario_parsing_from_file(extension: &str) {
     let mut config_path = get_root_path().join("../configs/server");
     assert!(config_path.set_extension(extension), "Cannot set extension");
     let config_path = config_path.as_path().display().to_string();
-    let config_provider = FileConfigProvider::new(config_path.clone());
+    let config_provider = get_file_config_provider();
     assert!(
         file_exists(&config_path),
         "Config file not found: {config_path}"
     );
     assert!(
-        config_provider.load_config().await.is_ok(),
+        ConfigProvider::<ServerConfig>::load_config(config_provider)
+            .await
+            .is_ok(),
         "ConfigProvider failed to parse config from {config_path}"
     );
 }
@@ -71,9 +74,8 @@ async fn validate_custom_env_provider() {
         env::set_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY", "10s");
     }
 
-    let config_path = get_root_path().join("../configs/server.toml");
-    let file_config_provider = 
FileConfigProvider::new(config_path.as_path().display().to_string());
-    let config = file_config_provider
+    let file_config_provider = get_file_config_provider();
+    let config: ServerConfig = file_config_provider
         .load_config()
         .await
         .expect("Failed to load default server.toml config");
@@ -148,9 +150,8 @@ async fn validate_cluster_config_env_override() {
         env::set_var("IGGY_CLUSTER_NODES_2_ADDRESS", expected_node_2_address);
     }
 
-    let config_path = get_root_path().join("../configs/server.toml");
-    let file_config_provider = 
FileConfigProvider::new(config_path.as_path().display().to_string());
-    let config = file_config_provider
+    let file_config_provider = get_file_config_provider();
+    let config: ServerConfig = file_config_provider
         .load_config()
         .await
         .expect("Failed to load server.toml config with cluster env 
overrides");
@@ -217,9 +218,8 @@ async fn validate_cluster_partial_env_override() {
         env::set_var("IGGY_CLUSTER_NODES_1_ADDRESS", expected_node_1_address);
     }
 
-    let config_path = get_root_path().join("../configs/server.toml");
-    let file_config_provider = 
FileConfigProvider::new(config_path.as_path().display().to_string());
-    let config = file_config_provider
+    let file_config_provider = get_file_config_provider();
+    let config: ServerConfig = file_config_provider
         .load_config()
         .await
         .expect("Failed to load server.toml config with partial cluster env 
overrides");
@@ -260,11 +260,10 @@ async fn 
validate_cluster_sparse_array_fails_with_missing_fields() {
         env::set_var("IGGY_CLUSTER_NODES_5_ADDRESS", expected_node_5_address);
     }
 
-    let config_path = get_root_path().join("../configs/server.toml");
-    let file_config_provider = 
FileConfigProvider::new(config_path.as_path().display().to_string());
+    let file_config_provider = get_file_config_provider();
 
     // This should fail because nodes 2-4 will be missing required fields
-    let result = file_config_provider.load_config().await;
+    let result: Result<ServerConfig, ConfigurationError> = 
file_config_provider.load_config().await;
     assert!(
         result.is_err(),
         "Should fail to load config with sparse array due to missing required 
fields in intermediate elements"
@@ -292,9 +291,8 @@ async fn validate_cluster_contiguous_array_override() {
         env::set_var("IGGY_CLUSTER_NODES_2_ADDRESS", expected_node_2_address);
     }
 
-    let config_path = get_root_path().join("../configs/server.toml");
-    let file_config_provider = 
FileConfigProvider::new(config_path.as_path().display().to_string());
-    let config = file_config_provider
+    let file_config_provider = get_file_config_provider();
+    let config: ServerConfig = file_config_provider
         .load_config()
         .await
         .expect("Failed to load server.toml config with contiguous array 
override");
@@ -323,3 +321,8 @@ async fn validate_cluster_contiguous_array_override() {
         env::remove_var("IGGY_CLUSTER_NODES_2_ADDRESS");
     }
 }
+
+fn get_file_config_provider() -> FileConfigProvider<ServerEnvProvider> {
+    let config_path = get_root_path().join("../configs/server.toml");
+    ServerConfig::config_provider(config_path.as_path().display().to_string())
+}
diff --git a/core/integration/tests/connectors/mod.rs 
b/core/integration/tests/connectors/mod.rs
index 789ca3ed..b437f8dc 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -23,34 +23,25 @@ use integration::{
     test_connectors_runtime::TestConnectorsRuntime,
     test_server::{ClientFactory, IpAddrKind, TestServer},
 };
-use serial_test::parallel;
 use std::collections::HashMap;
 
-mod sinks;
-mod sources;
+mod postgres;
 
-#[tokio::test]
-#[parallel]
-async fn connectors_runtime_should_start() {
-    let mut infra = setup();
-    infra.start_connectors_runtime(None, None).await;
-}
-
-fn setup() -> ConnectorsInfra {
+fn setup_runtime() -> ConnectorsInfra {
     let mut iggy_envs = HashMap::new();
     iggy_envs.insert("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned());
     let mut test_server = TestServer::new(Some(iggy_envs), true, None, 
IpAddrKind::V4);
     test_server.start();
     ConnectorsInfra {
         iggy_server: test_server,
-        connectors_runtim: None,
+        connectors_runtime: None,
     }
 }
 
 #[derive(Debug)]
 struct ConnectorsInfra {
     iggy_server: TestServer,
-    connectors_runtim: Option<TestConnectorsRuntime>,
+    connectors_runtime: Option<TestConnectorsRuntime>,
 }
 
 impl ConnectorsInfra {
@@ -59,7 +50,7 @@ impl ConnectorsInfra {
         config_path: Option<&str>,
         envs: Option<HashMap<String, String>>,
     ) {
-        if self.connectors_runtim.is_some() {
+        if self.connectors_runtime.is_some() {
             return;
         }
 
@@ -90,7 +81,7 @@ impl ConnectorsInfra {
             TestConnectorsRuntime::with_iggy_address(&iggy_server_address, 
all_envs);
         connectors_runtime.start();
         connectors_runtime.ensure_started().await;
-        self.connectors_runtim = Some(connectors_runtime);
+        self.connectors_runtime = Some(connectors_runtime);
     }
 
     pub async fn create_client(&self) -> IggyClient {
diff --git a/core/integration/tests/connectors/sinks/postgres/mod.rs 
b/core/integration/tests/connectors/postgres/mod.rs
similarity index 81%
rename from core/integration/tests/connectors/sinks/postgres/mod.rs
rename to core/integration/tests/connectors/postgres/mod.rs
index 8ab8fc8e..98cd0eea 100644
--- a/core/integration/tests/connectors/sinks/postgres/mod.rs
+++ b/core/integration/tests/connectors/postgres/mod.rs
@@ -16,11 +16,12 @@
  * under the License.
  */
 
-use crate::connectors::setup;
+use crate::connectors::setup_runtime;
 use iggy_binary_protocol::{StreamClient, TopicClient};
 use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
 use std::collections::HashMap;
 use testcontainers_modules::{postgres, testcontainers::runners::AsyncRunner};
+use tokio::time;
 
 #[tokio::test]
 async fn given_valid_configuration_postgres_sink_should_start() {
@@ -34,11 +35,16 @@ async fn 
given_valid_configuration_postgres_sink_should_start() {
         .expect("Failed to get Postgres port");
 
     let mut envs = HashMap::new();
+    let connection_string = 
format!("postgres://postgres:postgres@localhost:{host_port}");
     envs.insert(
         "IGGY_CONNECTORS_SINKS_POSTGRES_CONFIG_CONNECTION_STRING".to_owned(),
-        format!("postgres://postgres:postgres@localhost:{host_port}"),
+        connection_string,
     );
-    let mut infra = setup();
+    println!(
+        "Connection string: {}",
+        envs["IGGY_CONNECTORS_SINKS_POSTGRES_CONFIG_CONNECTION_STRING"]
+    );
+    let mut infra = setup_runtime();
     let client = infra.create_client().await;
     let stream_name = "test";
     let topic_name = "test";
@@ -61,6 +67,8 @@ async fn 
given_valid_configuration_postgres_sink_should_start() {
         .await
         .expect("Failed to create topic");
     infra
-        .start_connectors_runtime(Some("sinks/postgres/postgres.toml"), 
Some(envs))
+        .start_connectors_runtime(Some("postgres/postgres.toml"), Some(envs))
         .await;
+    time::sleep(std::time::Duration::from_secs(5)).await;
+    println!("Bye");
 }
diff --git a/core/integration/tests/connectors/sinks/postgres/postgres.toml 
b/core/integration/tests/connectors/postgres/postgres.toml
similarity index 100%
rename from core/integration/tests/connectors/sinks/postgres/postgres.toml
rename to core/integration/tests/connectors/postgres/postgres.toml
diff --git a/core/integration/tests/connectors/sources/postgres/mod.rs 
b/core/integration/tests/connectors/postgres/postgres_sink.rs
similarity index 100%
copy from core/integration/tests/connectors/sources/postgres/mod.rs
copy to core/integration/tests/connectors/postgres/postgres_sink.rs
diff --git a/core/integration/tests/connectors/sources/postgres/mod.rs 
b/core/integration/tests/connectors/postgres/postgres_source.rs
similarity index 100%
rename from core/integration/tests/connectors/sources/postgres/mod.rs
rename to core/integration/tests/connectors/postgres/postgres_source.rs
diff --git a/core/integration/tests/connectors/sinks/mod.rs 
b/core/integration/tests/connectors/sinks/mod.rs
deleted file mode 100644
index 9c64d589..00000000
--- a/core/integration/tests/connectors/sinks/mod.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-mod postgres;
diff --git a/core/integration/tests/connectors/sources/mod.rs 
b/core/integration/tests/connectors/sources/mod.rs
deleted file mode 100644
index 9c64d589..00000000
--- a/core/integration/tests/connectors/sources/mod.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-mod postgres;
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 049e6290..7f457b7e 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -58,7 +58,7 @@ dotenvy = { workspace = true }
 enum_dispatch = { workspace = true }
 error_set = { version = "0.8.5", features = ["tracing"] }
 figlet-rs = { workspace = true }
-figment = { version = "0.10.19", features = ["toml", "env"] }
+figment = { workspace = true }
 flume = { workspace = true }
 futures = { workspace = true }
 human-repr = { workspace = true }
diff --git a/core/server/src/configs/mod.rs b/core/server/src/configs/mod.rs
index 936994ac..8c48e345 100644
--- a/core/server/src/configs/mod.rs
+++ b/core/server/src/configs/mod.rs
@@ -18,7 +18,6 @@
 
 pub mod cache_indexes;
 pub mod cluster;
-pub mod config_provider;
 pub mod defaults;
 pub mod displays;
 pub mod http;
diff --git a/core/server/src/configs/server.rs 
b/core/server/src/configs/server.rs
index 2e897922..c977075b 100644
--- a/core/server/src/configs/server.rs
+++ b/core/server/src/configs/server.rs
@@ -16,10 +16,10 @@
  * under the License.
  */
 
+use crate::IGGY_ROOT_PASSWORD_ENV;
 use crate::archiver::ArchiverKindType;
 use crate::configs::COMPONENT;
 use crate::configs::cluster::ClusterConfig;
-use crate::configs::config_provider::ConfigProviderKind;
 use crate::configs::http::HttpConfig;
 use crate::configs::quic::QuicConfig;
 use crate::configs::system::SystemConfig;
@@ -27,14 +27,35 @@ use crate::configs::tcp::TcpConfig;
 use crate::server_error::ConfigError;
 use derive_more::Display;
 use error_set::ErrContext;
+use figment::Metadata;
+use figment::Profile;
+use figment::Provider;
+use figment::providers::Format;
+use figment::providers::Toml;
+use figment::value::Dict;
+use iggy_common::ConfigProvider;
+use iggy_common::CustomEnvProvider;
+use iggy_common::FileConfigProvider;
 use iggy_common::IggyDuration;
 use iggy_common::Validatable;
 use serde::{Deserialize, Serialize};
 use serde_with::DisplayFromStr;
 use serde_with::serde_as;
+use std::env;
 use std::str::FromStr;
 use std::sync::Arc;
 
+const DEFAULT_CONFIG_PROVIDER: &str = "file";
+const DEFAULT_CONFIG_PATH: &str = "core/configs/server.toml";
+const SECRET_KEYS: [&str; 6] = [
+    IGGY_ROOT_PASSWORD_ENV,
+    "IGGY_DATA_MAINTENANCE_ARCHIVER_S3_KEY_SECRET",
+    "IGGY_HTTP_JWT_ENCODING_SECRET",
+    "IGGY_HTTP_JWT_DECODING_SECRET",
+    "IGGY_TCP_TLS_PASSWORD",
+    "IGGY_SYSTEM_ENCRYPTION_KEY",
+];
+
 #[derive(Debug, Deserialize, Serialize, Clone)]
 pub struct ServerConfig {
     pub data_maintenance: DataMaintenanceConfig,
@@ -169,8 +190,38 @@ impl FromStr for TelemetryTransport {
     }
 }
 
+pub fn resolve(config_provider_type: &str) -> Result<ConfigProviderKind, 
ConfigError> {
+    match config_provider_type {
+        DEFAULT_CONFIG_PROVIDER => {
+            let path =
+                env::var("IGGY_CONFIG_PATH").unwrap_or_else(|_| 
DEFAULT_CONFIG_PATH.to_string());
+            Ok(ConfigProviderKind::File(ServerConfig::config_provider(
+                path,
+            )))
+        }
+        _ => Err(ConfigError::InvalidConfigurationProvider {
+            provider_type: config_provider_type.to_string(),
+        }),
+    }
+}
+
+pub enum ConfigProviderKind {
+    File(FileConfigProvider<ServerEnvProvider>),
+}
+
+impl ConfigProviderKind {
+    pub async fn load_config(self) -> Result<ServerConfig, ConfigError> {
+        match self {
+            Self::File(p) => p
+                .load_config()
+                .await
+                .map_err(|_| ConfigError::CannotLoadConfiguration),
+        }
+    }
+}
+
 impl ServerConfig {
-    pub async fn load(config_provider: &ConfigProviderKind) -> 
Result<ServerConfig, ConfigError> {
+    pub async fn load(config_provider: ConfigProviderKind) -> 
Result<ServerConfig, ConfigError> {
         let server_config = config_provider
             .load_config()
             .await
@@ -182,4 +233,34 @@ impl ServerConfig {
         })?;
         Ok(server_config)
     }
+
+    pub fn config_provider(path: String) -> 
FileConfigProvider<ServerEnvProvider> {
+        let default_config = 
Toml::string(include_str!("../../../configs/server.toml"));
+        FileConfigProvider::new(path, default_config, 
ServerEnvProvider::default())
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct ServerEnvProvider {
+    provider: CustomEnvProvider<ServerConfig>,
+}
+
+impl Default for ServerEnvProvider {
+    fn default() -> Self {
+        Self {
+            provider: CustomEnvProvider::new("IGGY_", &SECRET_KEYS),
+        }
+    }
+}
+
+impl Provider for ServerEnvProvider {
+    fn metadata(&self) -> Metadata {
+        Metadata::named("iggy-server-config")
+    }
+
+    fn data(&self) -> Result<figment::value::Map<Profile, Dict>, 
figment::Error> {
+        self.provider.deserialize().map_err(|_| {
+            figment::Error::from("Cannot deserialize environment variables for 
server config")
+        })
+    }
 }
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 51c3cabb..1ed5e438 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -28,7 +28,7 @@ use 
server::channels::commands::print_sysinfo::SysInfoPrintExecutor;
 use server::channels::commands::save_messages::SaveMessagesExecutor;
 use server::channels::commands::verify_heartbeats::VerifyHeartbeatsExecutor;
 use server::channels::handler::BackgroundServerCommandHandler;
-use server::configs::config_provider;
+use server::configs;
 use server::configs::server::ServerConfig;
 use server::http::http_server;
 #[cfg(not(feature = "tokio-console"))]
@@ -68,8 +68,8 @@ async fn main() -> Result<(), ServerError> {
 
     let args = Args::parse();
 
-    let config_provider = config_provider::resolve(&args.config_provider)?;
-    let config = ServerConfig::load(&config_provider).await?;
+    let config_provider = configs::server::resolve(&args.config_provider)?;
+    let config = ServerConfig::load(config_provider).await?;
     if args.fresh {
         let system_path = config.system.get_system_path();
         if tokio::fs::metadata(&system_path).await.is_ok() {


Reply via email to