This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_test
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/connectors_test by this push:
     new ce45bde6 Add hashmap config override, improve configs for connectors 
and mcp
ce45bde6 is described below

commit ce45bde663dc74bc1d4361c6c0257ceab0476622
Author: spetz <[email protected]>
AuthorDate: Mon Sep 22 11:21:42 2025 +0200

    Add hashmap config override, improve configs for connectors and mcp
---
 Cargo.lock                                         |  99 +-------
 Cargo.toml                                         |   1 -
 DEPENDENCIES.md                                    |   9 -
 core/ai/mcp/Cargo.toml                             |   3 +-
 core/ai/mcp/src/configs.rs                         |  48 ++++
 core/ai/mcp/src/main.rs                            |  15 +-
 core/common/src/configs/mod.rs                     | 265 +++++++++++++++++++++
 core/connectors/runtime/Cargo.toml                 |   1 -
 core/connectors/runtime/config.toml                | 134 -----------
 .../runtime/{config.toml => example_config.toml}   |   0
 core/connectors/runtime/src/configs.rs             |   6 +-
 core/connectors/sdk/src/sink.rs                    |   9 +-
 core/integration/src/test_connectors_runtime.rs    |   7 +-
 13 files changed, 334 insertions(+), 263 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 192783f6..644474fe 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -426,12 +426,6 @@ dependencies = [
  "password-hash",
 ]
 
-[[package]]
-name = "arraydeque"
-version = "0.5.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236"
-
 [[package]]
 name = "arrayref"
 version = "0.3.9"
@@ -1572,26 +1566,6 @@ dependencies = [
  "crossbeam-utils",
 ]
 
-[[package]]
-name = "config"
-version = "0.15.16"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "cef036f0ecf99baef11555578630e2cca559909b4c50822dbba828c252d21c49"
-dependencies = [
- "async-trait",
- "convert_case 0.6.0",
- "json5",
- "pathdiff",
- "ron",
- "rust-ini",
- "serde-untagged",
- "serde_core",
- "serde_json",
- "toml 0.9.6",
- "winnow 0.7.13",
- "yaml-rust2",
-]
-
 [[package]]
 name = "console"
 version = "0.15.11"
@@ -2416,17 +2390,6 @@ version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
 
-[[package]]
-name = "erased-serde"
-version = "0.4.8"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "259d404d09818dec19332e31d94558aeb442fea04c817006456c24b5460bbd4b"
-dependencies = [
- "serde",
- "serde_core",
- "typeid",
-]
-
 [[package]]
 name = "err_trail"
 version = "0.8.5"
@@ -4038,7 +4001,6 @@ version = "0.1.0"
 dependencies = [
  "axum 0.8.4",
  "axum-server",
- "config",
  "dashmap",
  "dlopen2",
  "dotenvy",
@@ -4070,10 +4032,11 @@ version = "0.1.0"
 dependencies = [
  "axum 0.8.4",
  "axum-server",
- "config",
  "dotenvy",
  "figlet-rs",
+ "figment",
  "iggy",
+ "iggy_common",
  "rmcp",
  "serde",
  "serde_json",
@@ -4559,17 +4522,6 @@ dependencies = [
  "wasm-bindgen",
 ]
 
-[[package]]
-name = "json5"
-version = "0.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1"
-dependencies = [
- "pest",
- "pest_derive",
- "serde",
-]
-
 [[package]]
 name = "jsonwebtoken"
 version = "9.3.1"
@@ -5760,12 +5712,6 @@ version = "1.0.15"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
 
-[[package]]
-name = "pathdiff"
-version = "0.2.3"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3"
-
 [[package]]
 name = "pbkdf2"
 version = "0.12.2"
@@ -6797,18 +6743,6 @@ dependencies = [
  "syn 2.0.106",
 ]
 
-[[package]]
-name = "ron"
-version = "0.8.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94"
-dependencies = [
- "base64 0.21.7",
- "bitflags 2.9.4",
- "serde",
- "serde_derive",
-]
-
 [[package]]
 name = "route-recognizer"
 version = "0.3.1"
@@ -7200,18 +7134,6 @@ dependencies = [
  "serde_derive",
 ]
 
-[[package]]
-name = "serde-untagged"
-version = "0.1.9"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f9faf48a4a2d2693be24c6289dbe26552776eb7737074e6722891fadbe6c5058"
-dependencies = [
- "erased-serde",
- "serde",
- "serde_core",
- "typeid",
-]
-
 [[package]]
 name = "serde-wasm-bindgen"
 version = "0.5.0"
@@ -8784,12 +8706,6 @@ dependencies = [
  "syn 2.0.106",
 ]
 
-[[package]]
-name = "typeid"
-version = "1.0.3"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c"
-
 [[package]]
 name = "typenum"
 version = "1.18.0"
@@ -9850,17 +9766,6 @@ dependencies = [
  "rustix",
 ]
 
-[[package]]
-name = "yaml-rust2"
-version = "0.10.4"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2462ea039c445496d8793d052e13787f2b90e750b833afee748e601c17621ed9"
-dependencies = [
- "arraydeque",
- "encoding_rs",
- "hashlink",
-]
-
 [[package]]
 name = "yansi"
 version = "1.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index 1f60b229..8c43f609 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -82,7 +82,6 @@ chrono = { version = "0.4.42", features = ["serde"] }
 clap = { version = "4.5.47", features = ["derive", "wrap_help"] }
 colored = "3.0.0"
 comfy-table = "7.2.1"
-config = { version = "0.15.16" }
 console-subscriber = "0.4.1"
 crc32fast = "1.5.0"
 crossbeam = "0.8.4"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index e370afda..01537720 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -34,7 +34,6 @@ anymap2: 0.13.0, "Apache-2.0 OR MIT",
 arbitrary: 1.4.2, "Apache-2.0 OR MIT",
 arc-swap: 1.7.1, "Apache-2.0 OR MIT",
 argon2: 0.5.3, "Apache-2.0 OR MIT",
-arraydeque: 0.5.1, "Apache-2.0 OR MIT",
 arrayref: 0.3.9, "BSD-2-Clause",
 arrayvec: 0.7.6, "Apache-2.0 OR MIT",
 assert_cmd: 2.0.17, "Apache-2.0 OR MIT",
@@ -134,7 +133,6 @@ compact_str: 0.7.1, "MIT",
 compression-codecs: 0.4.30, "Apache-2.0 OR MIT",
 compression-core: 0.4.29, "Apache-2.0 OR MIT",
 concurrent-queue: 2.5.0, "Apache-2.0 OR MIT",
-config: 0.15.16, "Apache-2.0 OR MIT",
 console: 0.15.11, "MIT",
 console-api: 0.8.1, "MIT",
 console-subscriber: 0.4.1, "MIT",
@@ -218,7 +216,6 @@ enum_dispatch: 0.3.13, "Apache-2.0 OR MIT",
 env_filter: 0.1.3, "Apache-2.0 OR MIT",
 env_logger: 0.11.8, "Apache-2.0 OR MIT",
 equivalent: 1.0.2, "Apache-2.0 OR MIT",
-erased-serde: 0.4.8, "Apache-2.0 OR MIT",
 err_trail: 0.8.5, "Apache-2.0",
 errno: 0.3.14, "Apache-2.0 OR MIT",
 error_set: 0.8.5, "Apache-2.0",
@@ -393,7 +390,6 @@ jni: 0.21.1, "Apache-2.0 OR MIT",
 jni-sys: 0.3.0, "Apache-2.0 OR MIT",
 jobserver: 0.1.34, "Apache-2.0 OR MIT",
 js-sys: 0.3.80, "Apache-2.0 OR MIT",
-json5: 0.4.1, "ISC",
 jsonwebtoken: 9.3.1, "MIT",
 jwalk: 0.8.1, "MIT",
 keyring: 3.6.3, "Apache-2.0 OR MIT",
@@ -512,7 +508,6 @@ parse-display-derive: 0.9.1, "Apache-2.0 OR MIT",
 passterm: 2.0.1, "BSD-3-Clause",
 password-hash: 0.5.0, "Apache-2.0 OR MIT",
 paste: 1.0.15, "Apache-2.0 OR MIT",
-pathdiff: 0.2.3, "Apache-2.0 OR MIT",
 pbkdf2: 0.12.2, "Apache-2.0 OR MIT",
 pear: 0.2.9, "Apache-2.0 OR MIT",
 pear_codegen: 0.2.9, "Apache-2.0 OR MIT",
@@ -606,7 +601,6 @@ rkyv: 0.7.45, "MIT",
 rkyv_derive: 0.7.45, "MIT",
 rmcp: 0.6.4, "MIT",
 rmcp-macros: 0.6.4, "MIT",
-ron: 0.8.1, "Apache-2.0 OR MIT",
 route-recognizer: 0.3.1, "MIT",
 rsa: 0.9.8, "Apache-2.0 OR MIT",
 rust-ini: 0.21.3, "MIT",
@@ -643,7 +637,6 @@ security-framework: 3.4.0, "Apache-2.0 OR MIT",
 security-framework-sys: 2.15.0, "Apache-2.0 OR MIT",
 semver: 1.0.27, "Apache-2.0 OR MIT",
 serde: 1.0.225, "Apache-2.0 OR MIT",
-serde-untagged: 0.1.9, "Apache-2.0 OR MIT",
 serde-wasm-bindgen: 0.5.0, "MIT",
 serde-wasm-bindgen: 0.6.5, "MIT",
 serde_core: 1.0.225, "Apache-2.0 OR MIT",
@@ -769,7 +762,6 @@ try-lock: 0.2.5, "MIT",
 twox-hash: 2.1.2, "MIT",
 typed-builder: 0.15.2, "Apache-2.0 OR MIT",
 typed-builder-macro: 0.15.2, "Apache-2.0 OR MIT",
-typeid: 1.0.3, "Apache-2.0 OR MIT",
 typenum: 1.18.0, "Apache-2.0 OR MIT",
 ucd-trie: 0.1.7, "Apache-2.0 OR MIT",
 ulid: 1.2.1, "MIT",
@@ -894,7 +886,6 @@ wit-bindgen: 0.46.0, "Apache-2.0 OR Apache-2.0 WITH 
LLVM-exception OR MIT",
 writeable: 0.6.1, "Unicode-3.0",
 wyz: 0.5.1, "MIT",
 xattr: 1.5.1, "Apache-2.0 OR MIT",
-yaml-rust2: 0.10.4, "Apache-2.0 OR MIT",
 yansi: 1.0.1, "Apache-2.0 OR MIT",
 yasna: 0.5.2, "Apache-2.0 OR MIT",
 yew: 0.21.0, "Apache-2.0 OR MIT",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index 206dc351..68e733c1 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -29,10 +29,11 @@ readme = "README.md"
 [dependencies]
 axum = { workspace = true }
 axum-server = { workspace = true }
-config = { workspace = true }
 dotenvy = { workspace = true }
 figlet-rs = { workspace = true }
+figment = { workspace = true }
 iggy = { workspace = true }
+iggy_common = { workspace = true }
 rmcp = { version = "0.6.4", features = [
     "server",
     "transport-io",
diff --git a/core/ai/mcp/src/configs.rs b/core/ai/mcp/src/configs.rs
index c4327e72..1f2e6672 100644
--- a/core/ai/mcp/src/configs.rs
+++ b/core/ai/mcp/src/configs.rs
@@ -17,7 +17,13 @@
  */
 
 use axum::http::Method;
+use figment::{
+    Metadata, Profile, Provider,
+    providers::{Format, Toml},
+    value::Dict,
+};
 use iggy::prelude::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
+use iggy_common::{CustomEnvProvider, FileConfigProvider};
 use serde::{Deserialize, Serialize};
 use strum::Display;
 use tower_http::cors::{AllowOrigin, CorsLayer};
@@ -177,3 +183,45 @@ pub fn configure_cors(config: &HttpCorsConfig) -> 
CorsLayer {
         .allow_credentials(config.allow_credentials)
         .allow_private_network(config.allow_private_network)
 }
+
+impl std::fmt::Display for McpServerConfig {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "McpServerConfig {{ http: {:?}, iggy: {:?}, permissions: {:?}, 
transport: {:?} }}",
+            self.http, self.iggy, self.permissions, self.transport
+        )
+    }
+}
+
+impl McpServerConfig {
+    pub fn config_provider(path: String) -> 
FileConfigProvider<McpServerEnvProvider> {
+        let default_config = 
Toml::string(include_str!("../../../ai/mcp/config.toml"));
+        FileConfigProvider::new(path, default_config, 
McpServerEnvProvider::default())
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct McpServerEnvProvider {
+    provider: CustomEnvProvider<McpServerConfig>,
+}
+
+impl Default for McpServerEnvProvider {
+    fn default() -> Self {
+        Self {
+            provider: CustomEnvProvider::new("IGGY_MCP_", &[]),
+        }
+    }
+}
+
+impl Provider for McpServerEnvProvider {
+    fn metadata(&self) -> Metadata {
+        Metadata::named("iggy-mcp-server-config")
+    }
+
+    fn data(&self) -> Result<figment::value::Map<Profile, Dict>, 
figment::Error> {
+        self.provider.deserialize().map_err(|_| {
+            figment::Error::from("Cannot deserialize environment variables for 
MCP config")
+        })
+    }
+}
diff --git a/core/ai/mcp/src/main.rs b/core/ai/mcp/src/main.rs
index b118720d..1633fe1d 100644
--- a/core/ai/mcp/src/main.rs
+++ b/core/ai/mcp/src/main.rs
@@ -16,12 +16,12 @@
  * under the License.
  */
 
-use config::{Config, Environment, File};
 use configs::{McpServerConfig, McpTransport};
 use dotenvy::dotenv;
 use error::McpRuntimeError;
 use figlet_rs::FIGfont;
 use iggy::prelude::{Client, Identifier};
+use iggy_common::ConfigProvider;
 use rmcp::{ServiceExt, model::ErrorData, transport::stdio};
 use service::IggyService;
 use std::{env, sync::Arc};
@@ -68,14 +68,11 @@ async fn main() -> Result<(), McpRuntimeError> {
 
     let config_path = config_path.unwrap_or_else(|_| "config".to_string());
     eprintln!("Configuration file path: {config_path}");
-    let config: McpServerConfig = Config::builder()
-        
.add_source(Config::try_from(&McpServerConfig::default()).expect("Failed to 
init config"))
-        .add_source(File::with_name(&config_path).required(false))
-        .add_source(Environment::with_prefix("IGGY_MCP").separator("_"))
-        .build()
-        .expect("Failed to build runtime config")
-        .try_deserialize()
-        .expect("Failed to deserialize runtime config");
+
+    let config: McpServerConfig = McpServerConfig::config_provider(config_path)
+        .load_config()
+        .await
+        .expect("Failed to load configuration");
 
     let transport = config.transport;
     if transport == McpTransport::Stdio {
diff --git a/core/common/src/configs/mod.rs b/core/common/src/configs/mod.rs
index c4cb8584..7a10d853 100644
--- a/core/common/src/configs/mod.rs
+++ b/core/common/src/configs/mod.rs
@@ -212,6 +212,19 @@ impl<T: Serialize + DeserializeOwned + Default + Display> 
CustomEnvProvider<T> {
         keys: &[String],
         value: FigmentValue,
     ) {
+        if keys.is_empty() {
+            return;
+        }
+
+        // Try to detect HashMap patterns
+        if let Some(hashmap_result) =
+            Self::try_handle_hashmap_override(source, target, keys, 
value.clone())
+            && hashmap_result
+        {
+            return;
+        }
+
+        // Fallback to original logic for non-HashMap patterns
         let mut current_source = source;
         let mut current_target = target;
         let mut combined_keys = Vec::new();
@@ -249,6 +262,258 @@ impl<T: Serialize + DeserializeOwned + Default + Display> 
CustomEnvProvider<T> {
         }
     }
 
+    fn try_handle_hashmap_override(
+        source: &Dict,
+        target: &mut Dict,
+        keys: &[String],
+        value: FigmentValue,
+    ) -> Option<bool> {
+        if keys.len() < 2 {
+            return Some(false);
+        }
+
+        let potential_hashmap_key = &keys[0];
+
+        // Check if this is actually a HashMap field in the source
+        let source_hashmap = match source.get(potential_hashmap_key) {
+            Some(FigmentValue::Dict(_, hashmap_dict)) => {
+                // For it to be a HashMap, it should either:
+                // 1. Have Dict values (actual HashMap entries)
+                // 2. Be empty (empty HashMap)
+                // 3. But NOT have regular struct fields
+
+                let has_dict_values = hashmap_dict
+                    .values()
+                    .any(|v| matches!(v, FigmentValue::Dict(_, _)));
+                let has_non_dict_values = hashmap_dict
+                    .values()
+                    .any(|v| !matches!(v, FigmentValue::Dict(_, _)));
+
+                // If it has non-Dict values mixed with Dict values, it's 
probably a regular struct, not a HashMap
+                if has_non_dict_values && has_dict_values {
+                    return Some(false);
+                }
+
+                // If it only has non-Dict values, it's definitely a regular 
struct
+                if has_non_dict_values && !has_dict_values {
+                    return Some(false);
+                }
+
+                // If it's empty or only has Dict values, it could be a HashMap
+                hashmap_dict
+            }
+            _ => return Some(false),
+        };
+
+        // Try to find the best HashMap entry key
+        if let Some((entry_key, remaining_keys)) =
+            Self::find_best_hashmap_split(source_hashmap, &keys[1..])
+        {
+            return Some(Self::apply_hashmap_override(
+                target,
+                potential_hashmap_key,
+                &entry_key,
+                &remaining_keys,
+                source_hashmap,
+                value,
+            ));
+        }
+
+        Some(false)
+    }
+
+    fn find_best_hashmap_split(
+        source_hashmap: &Dict,
+        keys: &[String],
+    ) -> Option<(String, Vec<String>)> {
+        if keys.is_empty() {
+            return None;
+        }
+
+        // First, try existing HashMap entry keys if any exist
+        if !source_hashmap.is_empty() {
+            for (existing_key, existing_value) in source_hashmap {
+                if let FigmentValue::Dict(_, entry_dict) = existing_value {
+                    // Try different ways to match this existing key
+                    for split_point in 1..=keys.len() {
+                        let candidate_key = keys[0..split_point].join("_");
+
+                        if candidate_key == *existing_key {
+                            let remaining_keys = keys[split_point..].to_vec();
+
+                            // Validate that the remaining keys form a valid 
path in the entry
+                            if remaining_keys.is_empty()
+                                || Self::is_valid_field_path(entry_dict, 
&remaining_keys)
+                            {
+                                return Some((existing_key.clone(), 
remaining_keys));
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        // For empty HashMaps or when no existing keys match, use simple split
+        let simple_entry_key = keys[0].clone();
+        let simple_remaining = keys[1..].to_vec();
+        Some((simple_entry_key, simple_remaining))
+    }
+
+    fn is_valid_field_path(dict: &Dict, keys: &[String]) -> bool {
+        if keys.is_empty() {
+            return true;
+        }
+
+        // Try different combinations to account for fields with underscores
+        for split_point in 1..=keys.len() {
+            let field_name = keys[0..split_point].join("_");
+            let remaining_keys = &keys[split_point..];
+
+            if let Some(field_value) = dict.get(&field_name) {
+                if remaining_keys.is_empty() {
+                    return true;
+                } else if let FigmentValue::Dict(_, nested_dict) = field_value 
{
+                    return Self::is_valid_field_path(nested_dict, 
remaining_keys);
+                }
+            }
+        }
+
+        // Also try the original approach with combined keys
+        let mut current_dict = dict;
+        let mut combined_keys = Vec::new();
+
+        for key in keys {
+            combined_keys.push(key.clone());
+            let combined_field = combined_keys.join("_");
+
+            if let Some(field_value) = current_dict.get(&combined_field) {
+                match field_value {
+                    FigmentValue::Dict(_, nested_dict) => {
+                        current_dict = nested_dict;
+                        combined_keys.clear();
+                    }
+                    _ => return true,
+                }
+            }
+        }
+
+        true
+    }
+
+    fn apply_hashmap_override(
+        target: &mut Dict,
+        hashmap_key: &str,
+        entry_key: &str,
+        remaining_keys: &[String],
+        source_hashmap: &Dict,
+        value: FigmentValue,
+    ) -> bool {
+        // Ensure the HashMap exists in target
+        target
+            .entry(hashmap_key.to_string())
+            .or_insert_with(|| FigmentValue::Dict(Tag::Default, Dict::new()));
+
+        if let Some(FigmentValue::Dict(_, target_hashmap)) = 
target.get_mut(hashmap_key) {
+            // Ensure the specific entry exists in the HashMap
+            target_hashmap
+                .entry(entry_key.to_string())
+                .or_insert_with(|| {
+                    if let Some(existing_entry) = 
source_hashmap.get(entry_key) {
+                        existing_entry.clone()
+                    } else {
+                        FigmentValue::Dict(Tag::Default, Dict::new())
+                    }
+                });
+
+            if remaining_keys.is_empty() {
+                target_hashmap.insert(entry_key.to_string(), value);
+            } else if let Some(FigmentValue::Dict(_, entry_dict)) =
+                target_hashmap.get_mut(entry_key)
+            {
+                // Check if we need special handling for serde_json::Value 
fields
+                if let Some((json_field, json_keys)) =
+                    Self::find_json_value_field_split(entry_dict, 
remaining_keys)
+                {
+                    Self::handle_json_value_override(entry_dict, &json_field, 
json_keys, value);
+                } else {
+                    Self::insert_overridden_values_from_env(
+                        &Dict::new(),
+                        entry_dict,
+                        remaining_keys.to_vec(),
+                        value,
+                    );
+                }
+            }
+            return true;
+        }
+
+        false
+    }
+
+    fn find_json_value_field_split(dict: &Dict, keys: &[String]) -> 
Option<(String, Vec<String>)> {
+        if keys.is_empty() {
+            return None;
+        }
+
+        // Try different split points to find a field that exists
+        for split_point in 1..=keys.len() {
+            let potential_json_field = keys[0..split_point].join("_");
+            let remaining_keys = keys[split_point..].to_vec();
+
+            if dict.contains_key(&potential_json_field) && 
!remaining_keys.is_empty() {
+                return Some((potential_json_field, remaining_keys));
+            }
+        }
+
+        // If no existing field matches and we have multiple keys, use first 
key
+        if keys.len() > 1 {
+            let potential_new_field = keys[0].clone();
+            let remaining_keys = keys[1..].to_vec();
+            return Some((potential_new_field, remaining_keys));
+        }
+
+        None
+    }
+
+    fn handle_json_value_override(
+        entry_dict: &mut Dict,
+        json_field: &str,
+        json_keys: Vec<String>,
+        value: FigmentValue,
+    ) {
+        // Ensure the JSON value field exists
+        entry_dict
+            .entry(json_field.to_string())
+            .or_insert_with(|| FigmentValue::Dict(Tag::Default, Dict::new()));
+
+        // Get or create the JSON field as a Dict
+        if let Some(json_value) = entry_dict.get_mut(json_field) {
+            let json_dict = match json_value {
+                FigmentValue::Dict(_, dict) => dict,
+                _ => {
+                    *json_value = FigmentValue::Dict(Tag::Default, 
Dict::new());
+                    if let FigmentValue::Dict(_, dict) = json_value {
+                        dict
+                    } else {
+                        return;
+                    }
+                }
+            };
+
+            Self::set_nested_json_field(json_dict, &json_keys, value);
+        }
+    }
+
+    fn set_nested_json_field(dict: &mut Dict, keys: &[String], value: 
FigmentValue) {
+        if keys.is_empty() {
+            return;
+        }
+
+        // Always try the full field name first (with underscores)
+        let full_field_name = keys.join("_");
+        dict.insert(full_field_name, value);
+    }
+
     fn navigate_to_dict<'a>(target: &'a mut Dict, path: &[String]) -> 
Option<&'a mut Dict> {
         if path.is_empty() {
             return Some(target);
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
index 46e1bb60..759f12a2 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -31,7 +31,6 @@ readme = "README.md"
 [dependencies]
 axum = { workspace = true }
 axum-server = { workspace = true }
-config = { workspace = true }
 dashmap = { workspace = true }
 dlopen2 = { workspace = true }
 dotenvy = { workspace = true }
diff --git a/core/connectors/runtime/config.toml 
b/core/connectors/runtime/config.toml
index ea868f8e..f106cb5d 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -42,137 +42,3 @@ password = "iggy"
 
 [state]
 path = "local_state"
-
-[sinks.stdout]
-enabled = true
-name = "Stdout sink"
-path = "target/release/libiggy_connector_stdout_sink"
-
-[[sinks.stdout.streams]]
-stream = "example_stream"
-topics = ["example_topic"]
-schema = "json"
-batch_length = 100
-poll_interval = "5ms"
-consumer_group = "stdout_sink_connector"
-
-[sinks.stdout.config]
-print_payload = false
-
-[sinks.stdout.transforms.add_fields]
-enabled = true
-
-[[sinks.stdout.transforms.add_fields.fields]]
-key = "message"
-value.static = "hello"
-
-[sources.random]
-enabled = true
-name = "Random source"
-path = "target/release/libiggy_connector_random_source"
-config_format = "json"
-
-[[sources.random.streams]]
-stream = "example_stream"
-topic = "example_topic"
-schema = "json"
-batch_length = 1000
-linger_time = "5ms"
-
-[sources.random.config]
-interval = "100ms"
-# max_count = 1000
-messages_range = [10, 50]
-payload_size = 200
-
-[sources.random.transforms.add_fields]
-enabled = true
-
-[[sources.random.transforms.add_fields.fields]]
-key = "test_field"
-value.static = "hello!"
-
-[sinks.quickwit]
-enabled = true
-name = "Quickwit sink 1"
-path = "target/release/libiggy_connector_quickwit_sink"
-config_format = "yaml"
-
-[[sinks.quickwit.streams]]
-stream = "qw"
-topics = ["records"]
-schema = "json"
-batch_length = 1000
-poll_interval = "5ms"
-consumer_group = "qw_sink_connector"
-
-[sinks.quickwit.transforms.add_fields]
-enabled = true
-
-[[sinks.quickwit.transforms.add_fields.fields]]
-key = "service_name"
-value.static = "qw_connector"
-
-[[sinks.quickwit.transforms.add_fields.fields]]
-key = "timestamp"
-value.computed = "timestamp_millis"
-
-[[sinks.quickwit.transforms.add_fields.fields]]
-key = "random_id"
-value.computed = "uuid_v7"
-
-[sinks.quickwit.transforms.delete_fields]
-enabled = true
-fields = ["email", "created_at"]
-
-[sinks.quickwit.config]
-url = "http://localhost:7280";
-index = """
-version: 0.9
-
-index_id: events
-
-doc_mapping:
-  mode: strict
-  field_mappings:
-    - name: timestamp
-      type: datetime
-      input_formats: [unix_timestamp]
-      output_format: unix_timestamp_nanos
-      indexed: false
-      fast: true
-      fast_precision: milliseconds
-    - name: service_name
-      type: text
-      tokenizer: raw
-      fast: true
-    - name: random_id
-      type: text
-      tokenizer: raw
-      fast: true
-    - name: user_id
-      type: text
-      tokenizer: raw
-      fast: true
-    - name: user_type
-      type: u64
-      fast: true
-    - name: source
-      type: text
-      tokenizer: default
-    - name: state
-      type: text
-      tokenizer: default
-    - name: message
-      type: text
-      tokenizer: default
-
-  timestamp_field: timestamp
-
-indexing_settings:
-  commit_timeout_secs: 10
-
-retention:
-  period: 7 days
-  schedule: daily
-"""
diff --git a/core/connectors/runtime/config.toml 
b/core/connectors/runtime/example_config.toml
similarity index 100%
copy from core/connectors/runtime/config.toml
copy to core/connectors/runtime/example_config.toml
diff --git a/core/connectors/runtime/src/configs.rs 
b/core/connectors/runtime/src/configs.rs
index c7a5b41f..8974798d 100644
--- a/core/connectors/runtime/src/configs.rs
+++ b/core/connectors/runtime/src/configs.rs
@@ -125,11 +125,7 @@ impl std::fmt::Display for RuntimeConfig {
         write!(
             f,
             "RuntimeConfig {{ http: {:?}, iggy: {:?}, sinks: {:?}, sources: 
{:?}, state: {:?} }}",
-            self.http_api,
-            self.iggy,
-            self.sinks.keys(),
-            self.sources.keys(),
-            self.state
+            self.http_api, self.iggy, self.sinks, self.sources, self.state
         )
     }
 }
diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
index dd1f4f14..8e04f6f0 100644
--- a/core/connectors/sdk/src/sink.rs
+++ b/core/connectors/sdk/src/sink.rs
@@ -73,9 +73,12 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
                 return -1;
             };
 
-            let Ok(config) = serde_json::from_str(config_str) else {
-                error!("Failed to parse configuration for sink connector with 
ID: {id}",);
-                return -1;
+            let config = match serde_json::from_str::<C>(config_str) {
+                Ok(cfg) => cfg,
+                Err(err) => {
+                    error!("Failed to parse configuration for sink connector 
with ID: {id}. {err}",);
+                    return -1;
+                }
             };
 
             let mut sink = factory(id, config);
diff --git a/core/integration/src/test_connectors_runtime.rs 
b/core/integration/src/test_connectors_runtime.rs
index 9a203fc3..00e419b9 100644
--- a/core/integration/src/test_connectors_runtime.rs
+++ b/core/integration/src/test_connectors_runtime.rs
@@ -28,8 +28,6 @@ use std::{collections::HashMap, net::TcpListener};
 use tokio::time::sleep;
 use uuid::Uuid;
 
-use crate::bench_utils::get_random_path;
-
 const LOCAL_STATE_PREFIX: &str = "local_state_";
 pub const CONSUMER_NAME: &str = "connectors";
 
@@ -71,7 +69,10 @@ impl TestConnectorsRuntime {
             "IGGY_CONNECTORS_IGGY_CONSUMER".to_string(),
             CONSUMER_NAME.to_string(),
         );
-        envs.insert("IGGY_CONNECTORS_STATE_PATH".to_string(), 
get_random_path());
+        envs.insert(
+            "IGGY_CONNECTORS_STATE_PATH".to_string(),
+            Self::get_random_path(),
+        );
         Self::create(envs, server_executable_path)
     }
 

Reply via email to