This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch connectors_test
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/connectors_test by this push:
new 785266c1 Common config loader WiP
785266c1 is described below
commit 785266c1cabaee76def39e0dae437531202939e3
Author: spetz <[email protected]>
AuthorDate: Sun Sep 21 23:16:33 2025 +0200
Common config loader WiP
---
Cargo.lock | 4 +
Cargo.toml | 1 +
DEPENDENCIES.md | 22 +++
core/common/Cargo.toml | 2 +
.../src/configs/mod.rs} | 184 +++++++++------------
core/common/src/lib.rs | 3 +
core/connectors/runtime/Cargo.toml | 2 +
core/connectors/runtime/config.toml | 6 +-
core/connectors/runtime/src/configs.rs | 54 +++++-
core/connectors/runtime/src/context.rs | 2 +-
core/connectors/runtime/src/main.rs | 16 +-
core/connectors/sinks/postgres_sink/src/lib.rs | 5 +
core/integration/src/test_connectors_runtime.rs | 13 +-
core/integration/src/test_server.rs | 16 +-
core/integration/tests/config_provider/mod.rs | 39 +++--
core/integration/tests/connectors/mod.rs | 21 +--
.../tests/connectors/{sinks => }/postgres/mod.rs | 16 +-
.../connectors/{sinks => }/postgres/postgres.toml | 0
.../postgres/mod.rs => postgres/postgres_sink.rs} | 0
.../mod.rs => postgres/postgres_source.rs} | 0
core/integration/tests/connectors/sinks/mod.rs | 19 ---
core/integration/tests/connectors/sources/mod.rs | 19 ---
core/server/Cargo.toml | 2 +-
core/server/src/configs/mod.rs | 1 -
core/server/src/configs/server.rs | 85 +++++++++-
core/server/src/main.rs | 6 +-
26 files changed, 324 insertions(+), 214 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 75de1710..192783f6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4043,9 +4043,11 @@ dependencies = [
"dlopen2",
"dotenvy",
"figlet-rs",
+ "figment",
"flume",
"futures",
"iggy",
+ "iggy_common",
"iggy_connector_sdk",
"mimalloc",
"once_cell",
@@ -4120,6 +4122,7 @@ dependencies = [
"crc32fast",
"derive_more 2.0.1",
"fast-async-mutex",
+ "figment",
"humantime",
"rcgen",
"rustls",
@@ -4129,6 +4132,7 @@ dependencies = [
"strum",
"thiserror 2.0.16",
"tokio",
+ "toml 0.9.6",
"tracing",
]
diff --git a/Cargo.toml b/Cargo.toml
index 1ab0abee..1f60b229 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -96,6 +96,7 @@ dotenvy = "0.15.7"
enum_dispatch = "0.3.13"
env_logger = "0.11.8"
figlet-rs = "0.1.5"
+figment = { version = "0.10.19", features = ["toml", "env"] }
flume = "0.11.1"
futures = "0.3.31"
futures-util = "0.3.31"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 34a7bc6d..e370afda 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -84,6 +84,9 @@ bitvec: 1.0.1, "MIT",
blake2: 0.10.6, "Apache-2.0 OR MIT",
blake3: 1.8.2, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR CC0-1.0",
block-buffer: 0.10.4, "Apache-2.0 OR MIT",
+bollard: 0.19.1, "Apache-2.0",
+bollard-buildkit-proto: 0.6.1, "Apache-2.0",
+bollard-stubs: 1.48.3-rc.28.0.4, "Apache-2.0",
bon: 3.7.2, "Apache-2.0 OR MIT",
bon-macros: 3.7.2, "Apache-2.0 OR MIT",
boolinator: 2.4.0, "Apache-2.0 OR MIT",
@@ -196,6 +199,7 @@ dlopen2: 0.8.0, "Custom License File",
dlopen2_derive: 0.4.1, "Custom License File",
dlv-list: 0.5.2, "Apache-2.0 OR MIT",
doc-comment: 0.3.3, "MIT",
+docker_credential: 1.3.2, "Apache-2.0 OR MIT",
document-features: 0.2.11, "Apache-2.0 OR MIT",
dotenvy: 0.15.7, "MIT",
downcast: 0.11.0, "MIT",
@@ -220,6 +224,7 @@ errno: 0.3.14, "Apache-2.0 OR MIT",
error_set: 0.8.5, "Apache-2.0",
error_set_impl: 0.8.5, "Apache-2.0",
etcetera: 0.8.0, "Apache-2.0 OR MIT",
+etcetera: 0.10.0, "Apache-2.0 OR MIT",
event-listener: 5.4.1, "Apache-2.0 OR MIT",
event-listener-strategy: 0.5.4, "Apache-2.0 OR MIT",
ext-trait: 1.0.1, "Apache-2.0 OR MIT OR Zlib",
@@ -231,6 +236,7 @@ fastrand: 2.3.0, "Apache-2.0 OR MIT",
figlet-rs: 0.1.5, "Apache-2.0",
figment: 0.10.19, "Apache-2.0 OR MIT",
file-operation: 0.8.4, "MIT",
+filetime: 0.2.26, "Apache-2.0 OR MIT",
find-msvc-tools: 0.1.1, "Apache-2.0 OR MIT",
flatbuffers: 25.2.10, "Apache-2.0",
flate2: 1.1.2, "Apache-2.0 OR MIT",
@@ -327,9 +333,11 @@ httpdate: 1.0.3, "Apache-2.0 OR MIT",
human-repr: 1.1.0, "MIT",
humantime: 2.3.0, "Apache-2.0 OR MIT",
hyper: 1.7.0, "MIT",
+hyper-named-pipe: 0.1.0, "Apache-2.0",
hyper-rustls: 0.27.7, "Apache-2.0 OR ISC OR MIT",
hyper-timeout: 0.5.2, "Apache-2.0 OR MIT",
hyper-util: 0.1.17, "MIT",
+hyperlocal: 0.9.1, "MIT",
iana-time-zone: 0.1.64, "Apache-2.0 OR MIT",
iana-time-zone-haiku: 0.1.2, "Apache-2.0 OR MIT",
icu_collections: 2.0.0, "Unicode-3.0",
@@ -460,13 +468,16 @@ nougat: 0.2.4, "Apache-2.0 OR MIT OR Zlib",
nougat-proc_macros: 0.2.4, "Apache-2.0 OR MIT OR Zlib",
ntapi: 0.4.1, "Apache-2.0 OR MIT",
nu-ansi-term: 0.50.1, "MIT",
+num: 0.4.3, "Apache-2.0 OR MIT",
num-bigint: 0.4.6, "Apache-2.0 OR MIT",
num-bigint-dig: 0.8.4, "Apache-2.0 OR MIT",
+num-complex: 0.4.6, "Apache-2.0 OR MIT",
num-conv: 0.1.0, "Apache-2.0 OR MIT",
num-integer: 0.1.46, "Apache-2.0 OR MIT",
num-iter: 0.1.45, "Apache-2.0 OR MIT",
num-modular: 0.6.1, "Apache-2.0",
num-order: 1.2.0, "Apache-2.0",
+num-rational: 0.4.2, "Apache-2.0 OR MIT",
num-traits: 0.2.19, "Apache-2.0 OR MIT",
num_cpus: 1.17.0, "Apache-2.0 OR MIT",
num_threads: 0.1.7, "Apache-2.0 OR MIT",
@@ -496,6 +507,8 @@ parking_lot: 0.11.2, "Apache-2.0 OR MIT",
parking_lot: 0.12.4, "Apache-2.0 OR MIT",
parking_lot_core: 0.8.6, "Apache-2.0 OR MIT",
parking_lot_core: 0.9.11, "Apache-2.0 OR MIT",
+parse-display: 0.9.1, "Apache-2.0 OR MIT",
+parse-display-derive: 0.9.1, "Apache-2.0 OR MIT",
passterm: 2.0.1, "BSD-3-Clause",
password-hash: 0.5.0, "Apache-2.0 OR MIT",
paste: 1.0.15, "Apache-2.0 OR MIT",
@@ -573,6 +586,7 @@ rayon: 1.11.0, "Apache-2.0 OR MIT",
rayon-core: 1.13.0, "Apache-2.0 OR MIT",
rcgen: 0.14.4, "Apache-2.0 OR MIT",
redox_syscall: 0.2.16, "MIT",
+redox_syscall: 0.3.5, "MIT",
redox_syscall: 0.5.17, "MIT",
redox_users: 0.5.2, "MIT",
ref-cast: 1.0.24, "Apache-2.0 OR MIT",
@@ -637,6 +651,7 @@ serde_derive: 1.0.225, "Apache-2.0 OR MIT",
serde_derive_internals: 0.29.1, "Apache-2.0 OR MIT",
serde_json: 1.0.145, "Apache-2.0 OR MIT",
serde_path_to_error: 0.1.20, "Apache-2.0 OR MIT",
+serde_repr: 0.1.20, "Apache-2.0 OR MIT",
serde_spanned: 0.6.9, "Apache-2.0 OR MIT",
serde_spanned: 1.0.1, "Apache-2.0 OR MIT",
serde_urlencoded: 0.7.1, "Apache-2.0 OR MIT",
@@ -681,6 +696,8 @@ static-toml: 1.3.0, "MIT",
static_assertions: 1.1.0, "Apache-2.0 OR MIT",
stringprep: 0.1.5, "Apache-2.0 OR MIT",
strsim: 0.11.1, "MIT",
+structmeta: 0.3.0, "Apache-2.0 OR MIT",
+structmeta-derive: 0.3.0, "Apache-2.0 OR MIT",
strum: 0.27.2, "MIT",
strum_macros: 0.27.2, "MIT",
subtle: 2.6.1, "BSD-3-Clause",
@@ -701,6 +718,8 @@ termtree: 0.5.1, "MIT",
test-case: 3.3.1, "MIT",
test-case-core: 3.3.1, "MIT",
test-case-macros: 3.3.1, "MIT",
+testcontainers: 0.25.0, "Apache-2.0 OR MIT",
+testcontainers-modules: 0.13.0, "MIT",
textwrap: 0.16.2, "MIT",
thiserror: 1.0.69, "Apache-2.0 OR MIT",
thiserror: 2.0.16, "Apache-2.0 OR MIT",
@@ -718,6 +737,7 @@ tokio: 1.47.1, "MIT",
tokio-macros: 2.5.0, "MIT",
tokio-rustls: 0.26.3, "Apache-2.0 OR MIT",
tokio-stream: 0.1.17, "MIT",
+tokio-tar: 0.3.1, "Apache-2.0 OR MIT",
tokio-util: 0.7.16, "MIT",
toml: 0.8.23, "Apache-2.0 OR MIT",
toml: 0.9.6, "Apache-2.0 OR MIT",
@@ -767,6 +787,7 @@ unicode-xid: 0.2.6, "Apache-2.0 OR MIT",
universal-hash: 0.5.1, "Apache-2.0 OR MIT",
untrusted: 0.9.0, "ISC",
unty: 0.0.4, "Apache-2.0 OR MIT",
+ureq: 2.12.1, "Apache-2.0 OR MIT",
url: 2.5.7, "Apache-2.0 OR MIT",
urlencoding: 2.1.3, "MIT",
utf8-width: 0.1.7, "MIT",
@@ -872,6 +893,7 @@ winnow: 0.7.13, "MIT",
wit-bindgen: 0.46.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
writeable: 0.6.1, "Unicode-3.0",
wyz: 0.5.1, "MIT",
+xattr: 1.5.1, "Apache-2.0 OR MIT",
yaml-rust2: 0.10.4, "Apache-2.0 OR MIT",
yansi: 1.0.1, "Apache-2.0 OR MIT",
yasna: 0.5.2, "Apache-2.0 OR MIT",
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index 48affb57..edff331e 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -45,6 +45,7 @@ comfy-table = { workspace = true }
crc32fast = { workspace = true }
derive_more = { workspace = true }
fast-async-mutex = { version = "0.6.7", optional = true }
+figment = { workspace = true }
humantime = { workspace = true }
rcgen = "0.14.4"
rustls = { workspace = true }
@@ -54,4 +55,5 @@ serde_with = { workspace = true, features = ["base64"] }
strum = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
+toml = { workspace = true }
tracing = { workspace = true }
diff --git a/core/server/src/configs/config_provider.rs
b/core/common/src/configs/mod.rs
similarity index 80%
rename from core/server/src/configs/config_provider.rs
rename to core/common/src/configs/mod.rs
index b77ec4bd..c4cb8584 100644
--- a/core/server/src/configs/config_provider.rs
+++ b/core/common/src/configs/mod.rs
@@ -16,64 +16,92 @@
* under the License.
*/
-use crate::IGGY_ROOT_PASSWORD_ENV;
-use crate::configs::server::ServerConfig;
-use crate::server_error::ConfigError;
use figment::{
- Error, Figment, Metadata, Profile, Provider,
- providers::{Format, Toml},
+ Figment, Profile, Provider,
+ providers::{Data, Format, Toml},
value::{Dict, Map as FigmentMap, Tag, Value as FigmentValue},
};
-use std::{env, future::Future, path::Path};
+use serde::{Serialize, de::DeserializeOwned};
+use std::{env, fmt::Display, future::Future, marker::PhantomData, path::Path};
use toml::{Value as TomlValue, map::Map};
-const DEFAULT_CONFIG_PROVIDER: &str = "file";
-const DEFAULT_CONFIG_PATH: &str = "configs/server.toml";
-const SECRET_KEYS: [&str; 6] = [
- IGGY_ROOT_PASSWORD_ENV,
- "IGGY_DATA_MAINTENANCE_ARCHIVER_S3_KEY_SECRET",
- "IGGY_HTTP_JWT_ENCODING_SECRET",
- "IGGY_HTTP_JWT_DECODING_SECRET",
- "IGGY_TCP_TLS_PASSWORD",
- "IGGY_SYSTEM_ENCRYPTION_KEY",
-];
-
-pub enum ConfigProviderKind {
- File(FileConfigProvider),
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum ConfigurationError {
+ CannotLoadConfiguration,
}
-impl ConfigProviderKind {
- pub async fn load_config(&self) -> Result<ServerConfig, ConfigError> {
- match self {
- Self::File(p) => p.load_config().await,
- }
- }
-}
-
-pub trait ConfigProvider {
- fn load_config(&self) -> impl Future<Output = Result<ServerConfig,
ConfigError>>;
+pub trait ConfigProvider<T: Serialize + DeserializeOwned + Default + Display> {
+ fn load_config(self) -> impl Future<Output = Result<T,
ConfigurationError>>;
}
-#[derive(Debug)]
-pub struct FileConfigProvider {
+pub struct FileConfigProvider<T: Provider> {
path: String,
+ default_config: Data<Toml>,
+ env_provider: T,
}
-pub struct CustomEnvProvider {
- prefix: String,
+impl<T: Provider> FileConfigProvider<T> {
+ pub fn new(path: String, default_config: Data<Toml>, env_provider: T) ->
Self {
+ Self {
+ path,
+ default_config,
+ env_provider,
+ }
+ }
}
-impl FileConfigProvider {
- pub fn new(path: String) -> Self {
- Self { path }
- }
+#[derive(Debug, Clone)]
+pub struct CustomEnvProvider<T: Serialize + DeserializeOwned + Default +
Display> {
+ prefix: String,
+ secret_keys: Vec<String>,
+ _data: PhantomData<T>,
}
-impl CustomEnvProvider {
- pub fn new(prefix: &str) -> Self {
+impl<T: Serialize + DeserializeOwned + Default + Display> CustomEnvProvider<T>
{
+ pub fn new(prefix: &str, secret_keys: &[&str]) -> Self {
Self {
prefix: prefix.to_string(),
+ secret_keys: secret_keys.iter().map(|s| s.to_string()).collect(),
+ _data: PhantomData,
+ }
+ }
+
+ pub fn deserialize(&self) -> Result<FigmentMap<Profile, Dict>,
ConfigurationError> {
+ let default_config = toml::to_string(&T::default())
+ .expect("Cannot serialize default Config. Something's terribly
wrong.");
+ let toml_value: TomlValue = toml::from_str(&default_config)
+ .expect("Cannot parse default Config. Something's terribly
wrong.");
+ let mut source_dict = Dict::new();
+ if let TomlValue::Table(table) = toml_value {
+ Self::walk_toml_table_to_dict("", table, &mut source_dict);
+ }
+
+ let mut new_dict = Dict::new();
+ for (key, mut value) in env::vars() {
+ let env_key = key.to_uppercase();
+ if !env_key.starts_with(self.prefix.as_str()) {
+ continue;
+ }
+ let keys: Vec<String> = env_key[self.prefix.len()..]
+ .split('_')
+ .map(|k| k.to_lowercase())
+ .collect();
+ let env_var_value = Self::try_parse_value(&value);
+ if self.secret_keys.contains(&env_key) {
+ value = "******".to_string();
+ }
+
+ println!("{env_key} value changed to: {value} from environment
variable");
+ Self::insert_overridden_values_from_env(
+ &source_dict,
+ &mut new_dict,
+ keys.clone(),
+ env_var_value.clone(),
+ );
}
+ let mut data = FigmentMap::new();
+ data.insert(Profile::default(), new_dict);
+ Ok(data)
}
fn walk_toml_table_to_dict(prefix: &str, table: Map<String, TomlValue>,
dict: &mut Dict) {
@@ -316,63 +344,6 @@ impl CustomEnvProvider {
}
}
-impl Provider for CustomEnvProvider {
- fn metadata(&self) -> Metadata {
- Metadata::named("iggy-server config")
- }
-
- fn data(&self) -> Result<FigmentMap<Profile, Dict>, Error> {
- let default_config = toml::to_string(&ServerConfig::default())
- .expect("Cannot serialize default ServerConfig. Something's
terribly wrong.");
- let toml_value: TomlValue = toml::from_str(&default_config).unwrap();
- let mut source_dict = Dict::new();
- if let TomlValue::Table(table) = toml_value {
- Self::walk_toml_table_to_dict("", table, &mut source_dict);
- }
-
- let mut new_dict = Dict::new();
- for (key, mut value) in env::vars() {
- let env_key = key.to_uppercase();
- if !env_key.starts_with(self.prefix.as_str()) {
- continue;
- }
- let keys: Vec<String> = env_key[self.prefix.len()..]
- .split('_')
- .map(|k| k.to_lowercase())
- .collect();
- let env_var_value = Self::try_parse_value(&value);
- if SECRET_KEYS.contains(&env_key.as_str()) {
- value = "******".to_string();
- }
-
- println!("{env_key} value changed to: {value} from environment
variable");
- Self::insert_overridden_values_from_env(
- &source_dict,
- &mut new_dict,
- keys.clone(),
- env_var_value.clone(),
- );
- }
- let mut data = FigmentMap::new();
- data.insert(Profile::default(), new_dict);
-
- Ok(data)
- }
-}
-
-pub fn resolve(config_provider_type: &str) -> Result<ConfigProviderKind,
ConfigError> {
- match config_provider_type {
- DEFAULT_CONFIG_PROVIDER => {
- let path =
- env::var("IGGY_CONFIG_PATH").unwrap_or_else(|_|
DEFAULT_CONFIG_PATH.to_string());
- Ok(ConfigProviderKind::File(FileConfigProvider::new(path)))
- }
- _ => Err(ConfigError::InvalidConfigurationProvider {
- provider_type: config_provider_type.to_string(),
- }),
- }
-}
-
/// This does exactly the same as Figment does internally.
fn file_exists<P: AsRef<Path>>(path: P) -> bool {
let path = path.as_ref();
@@ -400,32 +371,31 @@ fn file_exists<P: AsRef<Path>>(path: P) -> bool {
}
}
-impl ConfigProvider for FileConfigProvider {
- async fn load_config(&self) -> Result<ServerConfig, ConfigError> {
+impl<T: Serialize + DeserializeOwned + Default + Display, P: Provider + Clone>
ConfigProvider<T>
+ for FileConfigProvider<P>
+{
+ async fn load_config(self) -> Result<T, ConfigurationError> {
println!("Loading config from path: '{}'...", self.path);
- // Include the default configuration from server.toml
- let embedded_default_config =
Toml::string(include_str!("../../../configs/server.toml"));
-
// Start with the default configuration
- let mut config_builder = Figment::new().merge(embedded_default_config);
+ let mut config_builder = Figment::new().merge(self.default_config);
- // If the server.toml file exists, merge it into the configuration
+ // If the config file exists, merge it into the configuration
if file_exists(&self.path) {
println!("Found configuration file at path: '{}'.", self.path);
config_builder = config_builder.merge(Toml::file(&self.path));
} else {
println!(
- "Configuration file not found at path: '{}'. Using default
configuration from embedded server.toml.",
+ "Configuration file not found at path: '{}'. Using default
configuration from embedded config",
self.path
);
}
// Merge environment variables into the configuration
- config_builder = config_builder.merge(CustomEnvProvider::new("IGGY_"));
+ config_builder = config_builder.merge(self.env_provider);
// Finally, attempt to extract the final configuration
- let config_result: Result<ServerConfig, figment::Error> =
config_builder.extract();
+ let config_result: Result<T, figment::Error> =
config_builder.extract();
match config_result {
Ok(config) => {
@@ -435,7 +405,7 @@ impl ConfigProvider for FileConfigProvider {
}
Err(e) => {
println!("Failed to load config: {e}");
- Err(ConfigError::CannotLoadConfiguration)
+ Err(ConfigurationError::CannotLoadConfiguration)
}
}
}
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 4c527a06..ecc5478f 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -17,6 +17,7 @@
mod certificates;
mod commands;
+mod configs;
mod error;
mod traits;
mod types;
@@ -39,6 +40,8 @@ pub use commands::system::get_cluster_metadata::*;
pub use commands::system::*;
pub use commands::topics::*;
pub use commands::users::*;
+// Configs
+pub use configs::*;
// Traits
pub use traits::bytes_serializable::BytesSerializable;
pub use traits::partitioner::Partitioner;
diff --git a/core/connectors/runtime/Cargo.toml
b/core/connectors/runtime/Cargo.toml
index 168deb32..46e1bb60 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -36,9 +36,11 @@ dashmap = { workspace = true }
dlopen2 = { workspace = true }
dotenvy = { workspace = true }
figlet-rs = { workspace = true }
+figment = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
iggy = { workspace = true }
+iggy_common = { workspace = true }
iggy_connector_sdk = { workspace = true }
mimalloc = { workspace = true }
once_cell = { workspace = true }
diff --git a/core/connectors/runtime/config.toml
b/core/connectors/runtime/config.toml
index 3e00608c..ea868f8e 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -15,12 +15,12 @@
# specific language governing permissions and limitations
# under the License.
-[http] # Optional HTTP API configuration
+[http_api] # Optional HTTP API configuration
enabled = true
address = "127.0.0.1:8081"
# api_key = "secret" # Optional API key for authentication to be passed as
`api-key` header
-[http.cors] # Optional CORS configuration for HTTP API
+[http_api.cors] # Optional CORS configuration for HTTP API
enabled = false
allowed_methods = ["GET", "POST", "PUT", "DELETE"]
allowed_origins = ["*"]
@@ -29,7 +29,7 @@ exposed_headers = [""]
allow_credentials = false
allow_private_network = false
-[http.tls] # Optional TLS configuration for HTTP API
+[http_api.tls] # Optional TLS configuration for HTTP API
enabled = false
cert_file = "core/certs/iggy_cert.pem"
key_file = "core/certs/iggy_key.pem"
diff --git a/core/connectors/runtime/src/configs.rs
b/core/connectors/runtime/src/configs.rs
index 19a2b4dc..c7a5b41f 100644
--- a/core/connectors/runtime/src/configs.rs
+++ b/core/connectors/runtime/src/configs.rs
@@ -17,7 +17,13 @@
*/
use crate::api::config::HttpApiConfig;
+use figment::{
+ Metadata, Profile, Provider,
+ providers::{Format, Toml},
+ value::Dict,
+};
use iggy::prelude::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
+use iggy_common::{CustomEnvProvider, FileConfigProvider};
use iggy_connector_sdk::{Schema, transforms::TransformType};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -42,7 +48,7 @@ pub enum ConfigFormat {
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct RuntimeConfig {
- pub http: HttpApiConfig,
+ pub http_api: HttpApiConfig,
pub iggy: IggyConfig,
pub sinks: HashMap<String, SinkConfig>,
pub sources: HashMap<String, SourceConfig>,
@@ -114,6 +120,20 @@ pub struct StateConfig {
pub path: String,
}
+impl std::fmt::Display for RuntimeConfig {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "RuntimeConfig {{ http: {:?}, iggy: {:?}, sinks: {:?}, sources:
{:?}, state: {:?} }}",
+ self.http_api,
+ self.iggy,
+ self.sinks.keys(),
+ self.sources.keys(),
+ self.state
+ )
+ }
+}
+
impl Default for StateConfig {
fn default() -> Self {
Self {
@@ -144,3 +164,35 @@ impl Default for HttpApiConfig {
}
}
}
+
+impl RuntimeConfig {
+ pub fn config_provider(path: String) ->
FileConfigProvider<ConnectorsEnvProvider> {
+ let default_config =
Toml::string(include_str!("../../../connectors/runtime/config.toml"));
+ FileConfigProvider::new(path, default_config,
ConnectorsEnvProvider::default())
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct ConnectorsEnvProvider {
+ provider: CustomEnvProvider<RuntimeConfig>,
+}
+
+impl Default for ConnectorsEnvProvider {
+ fn default() -> Self {
+ Self {
+ provider: CustomEnvProvider::new("IGGY_CONNECTORS_", &[]),
+ }
+ }
+}
+
+impl Provider for ConnectorsEnvProvider {
+ fn metadata(&self) -> Metadata {
+ Metadata::named("iggy-connectors-config")
+ }
+
+ fn data(&self) -> Result<figment::value::Map<Profile, Dict>,
figment::Error> {
+ self.provider.deserialize().map_err(|_| {
+ figment::Error::from("Cannot deserialize environment variables for
connectors config")
+ })
+ }
+}
diff --git a/core/connectors/runtime/src/context.rs
b/core/connectors/runtime/src/context.rs
index d7dea9f8..6dd005b6 100644
--- a/core/connectors/runtime/src/context.rs
+++ b/core/connectors/runtime/src/context.rs
@@ -40,7 +40,7 @@ pub fn init(
RuntimeContext {
sinks: SinkManager::new(map_sinks(config, sink_wrappers)),
sources: SourceManager::new(map_sources(config, source_wrappers)),
- api_key: config.http.api_key.clone(),
+ api_key: config.http_api.api_key.clone(),
}
}
diff --git a/core/connectors/runtime/src/main.rs
b/core/connectors/runtime/src/main.rs
index 4ca3369e..bb615ebe 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -16,13 +16,13 @@
* under the License.
*/
-use config::{Config, Environment, File};
use configs::{ConfigFormat, RuntimeConfig};
use dlopen2::wrapper::{Container, WrapperApi};
use dotenvy::dotenv;
use error::RuntimeError;
use figlet_rs::FIGfont;
use iggy::prelude::{Client, IggyConsumer, IggyProducer};
+use iggy_common::ConfigProvider;
use iggy_connector_sdk::{
StreamDecoder, StreamEncoder,
sink::ConsumeCallback,
@@ -124,14 +124,10 @@ async fn main() -> Result<(), RuntimeError> {
let config_path = config_path.unwrap_or_else(|_| "config".to_string());
info!("Starting Iggy Connectors Runtime, loading configuration from:
{config_path}...");
- let config: RuntimeConfig = Config::builder()
- .add_source(Config::try_from(&RuntimeConfig::default()).expect("Failed
to init config"))
- .add_source(File::with_name(&config_path).required(false))
- .add_source(Environment::with_prefix("IGGY_CONNECTORS").separator("_"))
- .build()
- .expect("Failed to build runtime config")
- .try_deserialize()
- .expect("Failed to deserialize runtime config");
+ let config: RuntimeConfig = RuntimeConfig::config_provider(config_path)
+ .load_config()
+ .await
+ .expect("Failed to load configuration");
std::fs::create_dir_all(&config.state.path).expect("Failed to create state
directory");
@@ -182,7 +178,7 @@ async fn main() -> Result<(), RuntimeError> {
let context = context::init(&config, &sink_wrappers, &source_wrappers);
let context = Arc::new(context);
- api::init(&config.http, context).await;
+ api::init(&config.http_api, context).await;
source::handle(source_wrappers);
sink::consume(sink_wrappers);
diff --git a/core/connectors/sinks/postgres_sink/src/lib.rs
b/core/connectors/sinks/postgres_sink/src/lib.rs
index 2ef6eb6b..0ea6cb7a 100644
--- a/core/connectors/sinks/postgres_sink/src/lib.rs
+++ b/core/connectors/sinks/postgres_sink/src/lib.rs
@@ -70,11 +70,16 @@ impl PostgresSink {
async fn connect(&mut self) -> Result<(), Error> {
let max_connections = self.config.max_connections.unwrap_or(10);
+ info!(
+ "Connecting to PostgreSQL database with max {} connections,
connection string: {}",
+ max_connections, self.config.connection_string
+ );
let pool = PgPoolOptions::new()
.max_connections(max_connections)
.connect(&self.config.connection_string)
.await
.map_err(|e| Error::InitError(format!("Failed to connect to
PostgreSQL: {e}")))?;
+ info!("Connected to PostgreSQL database");
sqlx::query("SELECT 1")
.execute(&pool)
diff --git a/core/integration/src/test_connectors_runtime.rs
b/core/integration/src/test_connectors_runtime.rs
index d28f67a7..9a203fc3 100644
--- a/core/integration/src/test_connectors_runtime.rs
+++ b/core/integration/src/test_connectors_runtime.rs
@@ -26,7 +26,11 @@ use std::process::{Child, Command};
use std::time::Duration;
use std::{collections::HashMap, net::TcpListener};
use tokio::time::sleep;
+use uuid::Uuid;
+use crate::bench_utils::get_random_path;
+
+const LOCAL_STATE_PREFIX: &str = "local_state_";
pub const CONSUMER_NAME: &str = "connectors";
#[derive(Debug)]
@@ -67,6 +71,7 @@ impl TestConnectorsRuntime {
"IGGY_CONNECTORS_IGGY_CONSUMER".to_string(),
CONSUMER_NAME.to_string(),
);
+ envs.insert("IGGY_CONNECTORS_STATE_PATH".to_string(),
get_random_path());
Self::create(envs, server_executable_path)
}
@@ -85,7 +90,7 @@ impl TestConnectorsRuntime {
pub fn start(&mut self) {
self.envs
- .entry("IGGY_CONNECTORS_HTTP_ADDRESS".to_string())
+ .entry("IGGY_CONNECTORS_HTTP_API_ADDRESS".to_string())
.or_insert(self.server_address.to_string());
let mut command = if let Some(server_executable_path) =
&self.server_executable_path {
Command::new(server_executable_path)
@@ -146,6 +151,10 @@ impl TestConnectorsRuntime {
self.child_handle.as_ref().unwrap().id()
}
+ pub fn get_random_path() -> String {
+ format!("{}{}", LOCAL_STATE_PREFIX, Uuid::now_v7().to_u128_le())
+ }
+
fn get_http_api_address(&self) -> String {
format!(
"http://{}:{}",
@@ -157,7 +166,7 @@ impl TestConnectorsRuntime {
pub async fn ensure_started(&self) {
let http_api_address = self.get_http_api_address();
let client = reqwest::Client::new();
- let max_retries = 3000;
+ let max_retries = 1000;
let mut retries = 0;
while let Err(error) = client.get(&http_api_address).send().await {
sleep(Duration::from_millis(20)).await;
diff --git a/core/integration/src/test_server.rs
b/core/integration/src/test_server.rs
index 2678eaee..f09c9369 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -30,12 +30,11 @@ use assert_cmd::prelude::CommandCargoExt;
use async_trait::async_trait;
use derive_more::Display;
use futures::executor::block_on;
-use iggy_common::TransportProtocol;
-use uuid::Uuid;
-
use iggy::prelude::UserStatus::Active;
use iggy::prelude::*;
-use server::configs::config_provider::{ConfigProvider, FileConfigProvider};
+use iggy_common::{ConfigProvider, TransportProtocol};
+use server::configs::server::ServerConfig;
+use uuid::Uuid;
pub const SYSTEM_PATH_ENV_VAR: &str = "IGGY_SYSTEM_PATH";
pub const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE";
@@ -310,13 +309,11 @@ impl TestServer {
fn wait_until_server_has_bound(&mut self) {
let config_path = format!("{}/runtime/current_config.toml",
self.local_data_path);
- let file_config_provider =
FileConfigProvider::new(config_path.clone());
-
let max_attempts = (MAX_PORT_WAIT_DURATION_S * 1000) /
SLEEP_INTERVAL_MS;
self.server_addrs.clear();
let config = block_on(async {
- let mut loaded_config = None;
+ let mut loaded_config: Option<ServerConfig> = None;
for _ in 0..max_attempts {
if !Path::new(&config_path).exists() {
@@ -328,7 +325,10 @@ impl TestServer {
sleep(Duration::from_millis(SLEEP_INTERVAL_MS));
continue;
}
- match file_config_provider.load_config().await {
+ match ServerConfig::config_provider(config_path.clone())
+ .load_config()
+ .await
+ {
Ok(config) => {
loaded_config = Some(config);
break;
diff --git a/core/integration/tests/config_provider/mod.rs
b/core/integration/tests/config_provider/mod.rs
index 5426807d..79de9689 100644
--- a/core/integration/tests/config_provider/mod.rs
+++ b/core/integration/tests/config_provider/mod.rs
@@ -16,22 +16,25 @@
* under the License.
*/
+use iggy_common::{ConfigProvider, ConfigurationError, FileConfigProvider};
use integration::file::{file_exists, get_root_path};
use serial_test::serial;
-use server::configs::config_provider::{ConfigProvider, FileConfigProvider};
+use server::configs::server::{ServerConfig, ServerEnvProvider};
use std::env;
async fn scenario_parsing_from_file(extension: &str) {
let mut config_path = get_root_path().join("../configs/server");
assert!(config_path.set_extension(extension), "Cannot set extension");
let config_path = config_path.as_path().display().to_string();
- let config_provider = FileConfigProvider::new(config_path.clone());
+ let config_provider = get_file_config_provider();
assert!(
file_exists(&config_path),
"Config file not found: {config_path}"
);
assert!(
- config_provider.load_config().await.is_ok(),
+ ConfigProvider::<ServerConfig>::load_config(config_provider)
+ .await
+ .is_ok(),
"ConfigProvider failed to parse config from {config_path}"
);
}
@@ -71,9 +74,8 @@ async fn validate_custom_env_provider() {
env::set_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY", "10s");
}
- let config_path = get_root_path().join("../configs/server.toml");
- let file_config_provider =
FileConfigProvider::new(config_path.as_path().display().to_string());
- let config = file_config_provider
+ let file_config_provider = get_file_config_provider();
+ let config: ServerConfig = file_config_provider
.load_config()
.await
.expect("Failed to load default server.toml config");
@@ -148,9 +150,8 @@ async fn validate_cluster_config_env_override() {
env::set_var("IGGY_CLUSTER_NODES_2_ADDRESS", expected_node_2_address);
}
- let config_path = get_root_path().join("../configs/server.toml");
- let file_config_provider =
FileConfigProvider::new(config_path.as_path().display().to_string());
- let config = file_config_provider
+ let file_config_provider = get_file_config_provider();
+ let config: ServerConfig = file_config_provider
.load_config()
.await
.expect("Failed to load server.toml config with cluster env
overrides");
@@ -217,9 +218,8 @@ async fn validate_cluster_partial_env_override() {
env::set_var("IGGY_CLUSTER_NODES_1_ADDRESS", expected_node_1_address);
}
- let config_path = get_root_path().join("../configs/server.toml");
- let file_config_provider =
FileConfigProvider::new(config_path.as_path().display().to_string());
- let config = file_config_provider
+ let file_config_provider = get_file_config_provider();
+ let config: ServerConfig = file_config_provider
.load_config()
.await
.expect("Failed to load server.toml config with partial cluster env
overrides");
@@ -260,11 +260,10 @@ async fn
validate_cluster_sparse_array_fails_with_missing_fields() {
env::set_var("IGGY_CLUSTER_NODES_5_ADDRESS", expected_node_5_address);
}
- let config_path = get_root_path().join("../configs/server.toml");
- let file_config_provider =
FileConfigProvider::new(config_path.as_path().display().to_string());
+ let file_config_provider = get_file_config_provider();
// This should fail because nodes 2-4 will be missing required fields
- let result = file_config_provider.load_config().await;
+ let result: Result<ServerConfig, ConfigurationError> =
file_config_provider.load_config().await;
assert!(
result.is_err(),
"Should fail to load config with sparse array due to missing required
fields in intermediate elements"
@@ -292,9 +291,8 @@ async fn validate_cluster_contiguous_array_override() {
env::set_var("IGGY_CLUSTER_NODES_2_ADDRESS", expected_node_2_address);
}
- let config_path = get_root_path().join("../configs/server.toml");
- let file_config_provider =
FileConfigProvider::new(config_path.as_path().display().to_string());
- let config = file_config_provider
+ let file_config_provider = get_file_config_provider();
+ let config: ServerConfig = file_config_provider
.load_config()
.await
.expect("Failed to load server.toml config with contiguous array
override");
@@ -323,3 +321,8 @@ async fn validate_cluster_contiguous_array_override() {
env::remove_var("IGGY_CLUSTER_NODES_2_ADDRESS");
}
}
+
+fn get_file_config_provider() -> FileConfigProvider<ServerEnvProvider> {
+ let config_path = get_root_path().join("../configs/server.toml");
+ ServerConfig::config_provider(config_path.as_path().display().to_string())
+}
diff --git a/core/integration/tests/connectors/mod.rs
b/core/integration/tests/connectors/mod.rs
index 789ca3ed..b437f8dc 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -23,34 +23,25 @@ use integration::{
test_connectors_runtime::TestConnectorsRuntime,
test_server::{ClientFactory, IpAddrKind, TestServer},
};
-use serial_test::parallel;
use std::collections::HashMap;
-mod sinks;
-mod sources;
+mod postgres;
-#[tokio::test]
-#[parallel]
-async fn connectors_runtime_should_start() {
- let mut infra = setup();
- infra.start_connectors_runtime(None, None).await;
-}
-
-fn setup() -> ConnectorsInfra {
+fn setup_runtime() -> ConnectorsInfra {
let mut iggy_envs = HashMap::new();
iggy_envs.insert("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned());
let mut test_server = TestServer::new(Some(iggy_envs), true, None,
IpAddrKind::V4);
test_server.start();
ConnectorsInfra {
iggy_server: test_server,
- connectors_runtim: None,
+ connectors_runtime: None,
}
}
#[derive(Debug)]
struct ConnectorsInfra {
iggy_server: TestServer,
- connectors_runtim: Option<TestConnectorsRuntime>,
+ connectors_runtime: Option<TestConnectorsRuntime>,
}
impl ConnectorsInfra {
@@ -59,7 +50,7 @@ impl ConnectorsInfra {
config_path: Option<&str>,
envs: Option<HashMap<String, String>>,
) {
- if self.connectors_runtim.is_some() {
+ if self.connectors_runtime.is_some() {
return;
}
@@ -90,7 +81,7 @@ impl ConnectorsInfra {
TestConnectorsRuntime::with_iggy_address(&iggy_server_address,
all_envs);
connectors_runtime.start();
connectors_runtime.ensure_started().await;
- self.connectors_runtim = Some(connectors_runtime);
+ self.connectors_runtime = Some(connectors_runtime);
}
pub async fn create_client(&self) -> IggyClient {
diff --git a/core/integration/tests/connectors/sinks/postgres/mod.rs
b/core/integration/tests/connectors/postgres/mod.rs
similarity index 81%
rename from core/integration/tests/connectors/sinks/postgres/mod.rs
rename to core/integration/tests/connectors/postgres/mod.rs
index 8ab8fc8e..98cd0eea 100644
--- a/core/integration/tests/connectors/sinks/postgres/mod.rs
+++ b/core/integration/tests/connectors/postgres/mod.rs
@@ -16,11 +16,12 @@
* under the License.
*/
-use crate::connectors::setup;
+use crate::connectors::setup_runtime;
use iggy_binary_protocol::{StreamClient, TopicClient};
use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
use std::collections::HashMap;
use testcontainers_modules::{postgres, testcontainers::runners::AsyncRunner};
+use tokio::time;
#[tokio::test]
async fn given_valid_configuration_postgres_sink_should_start() {
@@ -34,11 +35,16 @@ async fn
given_valid_configuration_postgres_sink_should_start() {
.expect("Failed to get Postgres port");
let mut envs = HashMap::new();
+ let connection_string =
format!("postgres://postgres:postgres@localhost:{host_port}");
envs.insert(
"IGGY_CONNECTORS_SINKS_POSTGRES_CONFIG_CONNECTION_STRING".to_owned(),
- format!("postgres://postgres:postgres@localhost:{host_port}"),
+ connection_string,
);
- let mut infra = setup();
+ println!(
+ "Connection string: {}",
+ envs["IGGY_CONNECTORS_SINKS_POSTGRES_CONFIG_CONNECTION_STRING"]
+ );
+ let mut infra = setup_runtime();
let client = infra.create_client().await;
let stream_name = "test";
let topic_name = "test";
@@ -61,6 +67,8 @@ async fn
given_valid_configuration_postgres_sink_should_start() {
.await
.expect("Failed to create topic");
infra
- .start_connectors_runtime(Some("sinks/postgres/postgres.toml"),
Some(envs))
+ .start_connectors_runtime(Some("postgres/postgres.toml"), Some(envs))
.await;
+ time::sleep(std::time::Duration::from_secs(5)).await;
+ println!("Bye");
}
diff --git a/core/integration/tests/connectors/sinks/postgres/postgres.toml
b/core/integration/tests/connectors/postgres/postgres.toml
similarity index 100%
rename from core/integration/tests/connectors/sinks/postgres/postgres.toml
rename to core/integration/tests/connectors/postgres/postgres.toml
diff --git a/core/integration/tests/connectors/sources/postgres/mod.rs
b/core/integration/tests/connectors/postgres/postgres_sink.rs
similarity index 100%
copy from core/integration/tests/connectors/sources/postgres/mod.rs
copy to core/integration/tests/connectors/postgres/postgres_sink.rs
diff --git a/core/integration/tests/connectors/sources/postgres/mod.rs
b/core/integration/tests/connectors/postgres/postgres_source.rs
similarity index 100%
rename from core/integration/tests/connectors/sources/postgres/mod.rs
rename to core/integration/tests/connectors/postgres/postgres_source.rs
diff --git a/core/integration/tests/connectors/sinks/mod.rs
b/core/integration/tests/connectors/sinks/mod.rs
deleted file mode 100644
index 9c64d589..00000000
--- a/core/integration/tests/connectors/sinks/mod.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-mod postgres;
diff --git a/core/integration/tests/connectors/sources/mod.rs
b/core/integration/tests/connectors/sources/mod.rs
deleted file mode 100644
index 9c64d589..00000000
--- a/core/integration/tests/connectors/sources/mod.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-mod postgres;
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 049e6290..7f457b7e 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -58,7 +58,7 @@ dotenvy = { workspace = true }
enum_dispatch = { workspace = true }
error_set = { version = "0.8.5", features = ["tracing"] }
figlet-rs = { workspace = true }
-figment = { version = "0.10.19", features = ["toml", "env"] }
+figment = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
human-repr = { workspace = true }
diff --git a/core/server/src/configs/mod.rs b/core/server/src/configs/mod.rs
index 936994ac..8c48e345 100644
--- a/core/server/src/configs/mod.rs
+++ b/core/server/src/configs/mod.rs
@@ -18,7 +18,6 @@
pub mod cache_indexes;
pub mod cluster;
-pub mod config_provider;
pub mod defaults;
pub mod displays;
pub mod http;
diff --git a/core/server/src/configs/server.rs
b/core/server/src/configs/server.rs
index 2e897922..c977075b 100644
--- a/core/server/src/configs/server.rs
+++ b/core/server/src/configs/server.rs
@@ -16,10 +16,10 @@
* under the License.
*/
+use crate::IGGY_ROOT_PASSWORD_ENV;
use crate::archiver::ArchiverKindType;
use crate::configs::COMPONENT;
use crate::configs::cluster::ClusterConfig;
-use crate::configs::config_provider::ConfigProviderKind;
use crate::configs::http::HttpConfig;
use crate::configs::quic::QuicConfig;
use crate::configs::system::SystemConfig;
@@ -27,14 +27,35 @@ use crate::configs::tcp::TcpConfig;
use crate::server_error::ConfigError;
use derive_more::Display;
use error_set::ErrContext;
+use figment::Metadata;
+use figment::Profile;
+use figment::Provider;
+use figment::providers::Format;
+use figment::providers::Toml;
+use figment::value::Dict;
+use iggy_common::ConfigProvider;
+use iggy_common::CustomEnvProvider;
+use iggy_common::FileConfigProvider;
use iggy_common::IggyDuration;
use iggy_common::Validatable;
use serde::{Deserialize, Serialize};
use serde_with::DisplayFromStr;
use serde_with::serde_as;
+use std::env;
use std::str::FromStr;
use std::sync::Arc;
+const DEFAULT_CONFIG_PROVIDER: &str = "file";
+const DEFAULT_CONFIG_PATH: &str = "core/configs/server.toml";
+const SECRET_KEYS: [&str; 6] = [
+ IGGY_ROOT_PASSWORD_ENV,
+ "IGGY_DATA_MAINTENANCE_ARCHIVER_S3_KEY_SECRET",
+ "IGGY_HTTP_JWT_ENCODING_SECRET",
+ "IGGY_HTTP_JWT_DECODING_SECRET",
+ "IGGY_TCP_TLS_PASSWORD",
+ "IGGY_SYSTEM_ENCRYPTION_KEY",
+];
+
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ServerConfig {
pub data_maintenance: DataMaintenanceConfig,
@@ -169,8 +190,38 @@ impl FromStr for TelemetryTransport {
}
}
+pub fn resolve(config_provider_type: &str) -> Result<ConfigProviderKind,
ConfigError> {
+ match config_provider_type {
+ DEFAULT_CONFIG_PROVIDER => {
+ let path =
+ env::var("IGGY_CONFIG_PATH").unwrap_or_else(|_|
DEFAULT_CONFIG_PATH.to_string());
+ Ok(ConfigProviderKind::File(ServerConfig::config_provider(
+ path,
+ )))
+ }
+ _ => Err(ConfigError::InvalidConfigurationProvider {
+ provider_type: config_provider_type.to_string(),
+ }),
+ }
+}
+
+pub enum ConfigProviderKind {
+ File(FileConfigProvider<ServerEnvProvider>),
+}
+
+impl ConfigProviderKind {
+ pub async fn load_config(self) -> Result<ServerConfig, ConfigError> {
+ match self {
+ Self::File(p) => p
+ .load_config()
+ .await
+ .map_err(|_| ConfigError::CannotLoadConfiguration),
+ }
+ }
+}
+
impl ServerConfig {
- pub async fn load(config_provider: &ConfigProviderKind) ->
Result<ServerConfig, ConfigError> {
+ pub async fn load(config_provider: ConfigProviderKind) ->
Result<ServerConfig, ConfigError> {
let server_config = config_provider
.load_config()
.await
@@ -182,4 +233,34 @@ impl ServerConfig {
})?;
Ok(server_config)
}
+
+ pub fn config_provider(path: String) ->
FileConfigProvider<ServerEnvProvider> {
+ let default_config =
Toml::string(include_str!("../../../configs/server.toml"));
+ FileConfigProvider::new(path, default_config,
ServerEnvProvider::default())
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct ServerEnvProvider {
+ provider: CustomEnvProvider<ServerConfig>,
+}
+
+impl Default for ServerEnvProvider {
+ fn default() -> Self {
+ Self {
+ provider: CustomEnvProvider::new("IGGY_", &SECRET_KEYS),
+ }
+ }
+}
+
+impl Provider for ServerEnvProvider {
+ fn metadata(&self) -> Metadata {
+ Metadata::named("iggy-server-config")
+ }
+
+ fn data(&self) -> Result<figment::value::Map<Profile, Dict>,
figment::Error> {
+ self.provider.deserialize().map_err(|_| {
+ figment::Error::from("Cannot deserialize environment variables for
server config")
+ })
+ }
}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 51c3cabb..1ed5e438 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -28,7 +28,7 @@ use
server::channels::commands::print_sysinfo::SysInfoPrintExecutor;
use server::channels::commands::save_messages::SaveMessagesExecutor;
use server::channels::commands::verify_heartbeats::VerifyHeartbeatsExecutor;
use server::channels::handler::BackgroundServerCommandHandler;
-use server::configs::config_provider;
+use server::configs;
use server::configs::server::ServerConfig;
use server::http::http_server;
#[cfg(not(feature = "tokio-console"))]
@@ -68,8 +68,8 @@ async fn main() -> Result<(), ServerError> {
let args = Args::parse();
- let config_provider = config_provider::resolve(&args.config_provider)?;
- let config = ServerConfig::load(&config_provider).await?;
+ let config_provider = configs::server::resolve(&args.config_provider)?;
+ let config = ServerConfig::load(config_provider).await?;
if args.fresh {
let system_path = config.system.get_system_path();
if tokio::fs::metadata(&system_path).await.is_ok() {