This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch connectors_test
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/connectors_test by this push:
new ce45bde6 Add hashmap config override, improve configs for connectors
and mcp
ce45bde6 is described below
commit ce45bde663dc74bc1d4361c6c0257ceab0476622
Author: spetz <[email protected]>
AuthorDate: Mon Sep 22 11:21:42 2025 +0200
Add hashmap config override, improve configs for connectors and mcp
---
Cargo.lock | 99 +-------
Cargo.toml | 1 -
DEPENDENCIES.md | 9 -
core/ai/mcp/Cargo.toml | 3 +-
core/ai/mcp/src/configs.rs | 48 ++++
core/ai/mcp/src/main.rs | 15 +-
core/common/src/configs/mod.rs | 265 +++++++++++++++++++++
core/connectors/runtime/Cargo.toml | 1 -
core/connectors/runtime/config.toml | 134 -----------
.../runtime/{config.toml => example_config.toml} | 0
core/connectors/runtime/src/configs.rs | 6 +-
core/connectors/sdk/src/sink.rs | 9 +-
core/integration/src/test_connectors_runtime.rs | 7 +-
13 files changed, 334 insertions(+), 263 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 192783f6..644474fe 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -426,12 +426,6 @@ dependencies = [
"password-hash",
]
-[[package]]
-name = "arraydeque"
-version = "0.5.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236"
-
[[package]]
name = "arrayref"
version = "0.3.9"
@@ -1572,26 +1566,6 @@ dependencies = [
"crossbeam-utils",
]
-[[package]]
-name = "config"
-version = "0.15.16"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cef036f0ecf99baef11555578630e2cca559909b4c50822dbba828c252d21c49"
-dependencies = [
- "async-trait",
- "convert_case 0.6.0",
- "json5",
- "pathdiff",
- "ron",
- "rust-ini",
- "serde-untagged",
- "serde_core",
- "serde_json",
- "toml 0.9.6",
- "winnow 0.7.13",
- "yaml-rust2",
-]
-
[[package]]
name = "console"
version = "0.15.11"
@@ -2416,17 +2390,6 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
-[[package]]
-name = "erased-serde"
-version = "0.4.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "259d404d09818dec19332e31d94558aeb442fea04c817006456c24b5460bbd4b"
-dependencies = [
- "serde",
- "serde_core",
- "typeid",
-]
-
[[package]]
name = "err_trail"
version = "0.8.5"
@@ -4038,7 +4001,6 @@ version = "0.1.0"
dependencies = [
"axum 0.8.4",
"axum-server",
- "config",
"dashmap",
"dlopen2",
"dotenvy",
@@ -4070,10 +4032,11 @@ version = "0.1.0"
dependencies = [
"axum 0.8.4",
"axum-server",
- "config",
"dotenvy",
"figlet-rs",
+ "figment",
"iggy",
+ "iggy_common",
"rmcp",
"serde",
"serde_json",
@@ -4559,17 +4522,6 @@ dependencies = [
"wasm-bindgen",
]
-[[package]]
-name = "json5"
-version = "0.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1"
-dependencies = [
- "pest",
- "pest_derive",
- "serde",
-]
-
[[package]]
name = "jsonwebtoken"
version = "9.3.1"
@@ -5760,12 +5712,6 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
-[[package]]
-name = "pathdiff"
-version = "0.2.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3"
-
[[package]]
name = "pbkdf2"
version = "0.12.2"
@@ -6797,18 +6743,6 @@ dependencies = [
"syn 2.0.106",
]
-[[package]]
-name = "ron"
-version = "0.8.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94"
-dependencies = [
- "base64 0.21.7",
- "bitflags 2.9.4",
- "serde",
- "serde_derive",
-]
-
[[package]]
name = "route-recognizer"
version = "0.3.1"
@@ -7200,18 +7134,6 @@ dependencies = [
"serde_derive",
]
-[[package]]
-name = "serde-untagged"
-version = "0.1.9"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f9faf48a4a2d2693be24c6289dbe26552776eb7737074e6722891fadbe6c5058"
-dependencies = [
- "erased-serde",
- "serde",
- "serde_core",
- "typeid",
-]
-
[[package]]
name = "serde-wasm-bindgen"
version = "0.5.0"
@@ -8784,12 +8706,6 @@ dependencies = [
"syn 2.0.106",
]
-[[package]]
-name = "typeid"
-version = "1.0.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c"
-
[[package]]
name = "typenum"
version = "1.18.0"
@@ -9850,17 +9766,6 @@ dependencies = [
"rustix",
]
-[[package]]
-name = "yaml-rust2"
-version = "0.10.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2462ea039c445496d8793d052e13787f2b90e750b833afee748e601c17621ed9"
-dependencies = [
- "arraydeque",
- "encoding_rs",
- "hashlink",
-]
-
[[package]]
name = "yansi"
version = "1.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index 1f60b229..8c43f609 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -82,7 +82,6 @@ chrono = { version = "0.4.42", features = ["serde"] }
clap = { version = "4.5.47", features = ["derive", "wrap_help"] }
colored = "3.0.0"
comfy-table = "7.2.1"
-config = { version = "0.15.16" }
console-subscriber = "0.4.1"
crc32fast = "1.5.0"
crossbeam = "0.8.4"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index e370afda..01537720 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -34,7 +34,6 @@ anymap2: 0.13.0, "Apache-2.0 OR MIT",
arbitrary: 1.4.2, "Apache-2.0 OR MIT",
arc-swap: 1.7.1, "Apache-2.0 OR MIT",
argon2: 0.5.3, "Apache-2.0 OR MIT",
-arraydeque: 0.5.1, "Apache-2.0 OR MIT",
arrayref: 0.3.9, "BSD-2-Clause",
arrayvec: 0.7.6, "Apache-2.0 OR MIT",
assert_cmd: 2.0.17, "Apache-2.0 OR MIT",
@@ -134,7 +133,6 @@ compact_str: 0.7.1, "MIT",
compression-codecs: 0.4.30, "Apache-2.0 OR MIT",
compression-core: 0.4.29, "Apache-2.0 OR MIT",
concurrent-queue: 2.5.0, "Apache-2.0 OR MIT",
-config: 0.15.16, "Apache-2.0 OR MIT",
console: 0.15.11, "MIT",
console-api: 0.8.1, "MIT",
console-subscriber: 0.4.1, "MIT",
@@ -218,7 +216,6 @@ enum_dispatch: 0.3.13, "Apache-2.0 OR MIT",
env_filter: 0.1.3, "Apache-2.0 OR MIT",
env_logger: 0.11.8, "Apache-2.0 OR MIT",
equivalent: 1.0.2, "Apache-2.0 OR MIT",
-erased-serde: 0.4.8, "Apache-2.0 OR MIT",
err_trail: 0.8.5, "Apache-2.0",
errno: 0.3.14, "Apache-2.0 OR MIT",
error_set: 0.8.5, "Apache-2.0",
@@ -393,7 +390,6 @@ jni: 0.21.1, "Apache-2.0 OR MIT",
jni-sys: 0.3.0, "Apache-2.0 OR MIT",
jobserver: 0.1.34, "Apache-2.0 OR MIT",
js-sys: 0.3.80, "Apache-2.0 OR MIT",
-json5: 0.4.1, "ISC",
jsonwebtoken: 9.3.1, "MIT",
jwalk: 0.8.1, "MIT",
keyring: 3.6.3, "Apache-2.0 OR MIT",
@@ -512,7 +508,6 @@ parse-display-derive: 0.9.1, "Apache-2.0 OR MIT",
passterm: 2.0.1, "BSD-3-Clause",
password-hash: 0.5.0, "Apache-2.0 OR MIT",
paste: 1.0.15, "Apache-2.0 OR MIT",
-pathdiff: 0.2.3, "Apache-2.0 OR MIT",
pbkdf2: 0.12.2, "Apache-2.0 OR MIT",
pear: 0.2.9, "Apache-2.0 OR MIT",
pear_codegen: 0.2.9, "Apache-2.0 OR MIT",
@@ -606,7 +601,6 @@ rkyv: 0.7.45, "MIT",
rkyv_derive: 0.7.45, "MIT",
rmcp: 0.6.4, "MIT",
rmcp-macros: 0.6.4, "MIT",
-ron: 0.8.1, "Apache-2.0 OR MIT",
route-recognizer: 0.3.1, "MIT",
rsa: 0.9.8, "Apache-2.0 OR MIT",
rust-ini: 0.21.3, "MIT",
@@ -643,7 +637,6 @@ security-framework: 3.4.0, "Apache-2.0 OR MIT",
security-framework-sys: 2.15.0, "Apache-2.0 OR MIT",
semver: 1.0.27, "Apache-2.0 OR MIT",
serde: 1.0.225, "Apache-2.0 OR MIT",
-serde-untagged: 0.1.9, "Apache-2.0 OR MIT",
serde-wasm-bindgen: 0.5.0, "MIT",
serde-wasm-bindgen: 0.6.5, "MIT",
serde_core: 1.0.225, "Apache-2.0 OR MIT",
@@ -769,7 +762,6 @@ try-lock: 0.2.5, "MIT",
twox-hash: 2.1.2, "MIT",
typed-builder: 0.15.2, "Apache-2.0 OR MIT",
typed-builder-macro: 0.15.2, "Apache-2.0 OR MIT",
-typeid: 1.0.3, "Apache-2.0 OR MIT",
typenum: 1.18.0, "Apache-2.0 OR MIT",
ucd-trie: 0.1.7, "Apache-2.0 OR MIT",
ulid: 1.2.1, "MIT",
@@ -894,7 +886,6 @@ wit-bindgen: 0.46.0, "Apache-2.0 OR Apache-2.0 WITH
LLVM-exception OR MIT",
writeable: 0.6.1, "Unicode-3.0",
wyz: 0.5.1, "MIT",
xattr: 1.5.1, "Apache-2.0 OR MIT",
-yaml-rust2: 0.10.4, "Apache-2.0 OR MIT",
yansi: 1.0.1, "Apache-2.0 OR MIT",
yasna: 0.5.2, "Apache-2.0 OR MIT",
yew: 0.21.0, "Apache-2.0 OR MIT",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index 206dc351..68e733c1 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -29,10 +29,11 @@ readme = "README.md"
[dependencies]
axum = { workspace = true }
axum-server = { workspace = true }
-config = { workspace = true }
dotenvy = { workspace = true }
figlet-rs = { workspace = true }
+figment = { workspace = true }
iggy = { workspace = true }
+iggy_common = { workspace = true }
rmcp = { version = "0.6.4", features = [
"server",
"transport-io",
diff --git a/core/ai/mcp/src/configs.rs b/core/ai/mcp/src/configs.rs
index c4327e72..1f2e6672 100644
--- a/core/ai/mcp/src/configs.rs
+++ b/core/ai/mcp/src/configs.rs
@@ -17,7 +17,13 @@
*/
use axum::http::Method;
+use figment::{
+ Metadata, Profile, Provider,
+ providers::{Format, Toml},
+ value::Dict,
+};
use iggy::prelude::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
+use iggy_common::{CustomEnvProvider, FileConfigProvider};
use serde::{Deserialize, Serialize};
use strum::Display;
use tower_http::cors::{AllowOrigin, CorsLayer};
@@ -177,3 +183,45 @@ pub fn configure_cors(config: &HttpCorsConfig) ->
CorsLayer {
.allow_credentials(config.allow_credentials)
.allow_private_network(config.allow_private_network)
}
+
+impl std::fmt::Display for McpServerConfig {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "McpServerConfig {{ http: {:?}, iggy: {:?}, permissions: {:?},
transport: {:?} }}",
+ self.http, self.iggy, self.permissions, self.transport
+ )
+ }
+}
+
+impl McpServerConfig {
+ pub fn config_provider(path: String) ->
FileConfigProvider<McpServerEnvProvider> {
+ let default_config =
Toml::string(include_str!("../../../ai/mcp/config.toml"));
+ FileConfigProvider::new(path, default_config,
McpServerEnvProvider::default())
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct McpServerEnvProvider {
+ provider: CustomEnvProvider<McpServerConfig>,
+}
+
+impl Default for McpServerEnvProvider {
+ fn default() -> Self {
+ Self {
+ provider: CustomEnvProvider::new("IGGY_MCP_", &[]),
+ }
+ }
+}
+
+impl Provider for McpServerEnvProvider {
+ fn metadata(&self) -> Metadata {
+ Metadata::named("iggy-mcp-server-config")
+ }
+
+ fn data(&self) -> Result<figment::value::Map<Profile, Dict>,
figment::Error> {
+ self.provider.deserialize().map_err(|_| {
+ figment::Error::from("Cannot deserialize environment variables for
MCP config")
+ })
+ }
+}
diff --git a/core/ai/mcp/src/main.rs b/core/ai/mcp/src/main.rs
index b118720d..1633fe1d 100644
--- a/core/ai/mcp/src/main.rs
+++ b/core/ai/mcp/src/main.rs
@@ -16,12 +16,12 @@
* under the License.
*/
-use config::{Config, Environment, File};
use configs::{McpServerConfig, McpTransport};
use dotenvy::dotenv;
use error::McpRuntimeError;
use figlet_rs::FIGfont;
use iggy::prelude::{Client, Identifier};
+use iggy_common::ConfigProvider;
use rmcp::{ServiceExt, model::ErrorData, transport::stdio};
use service::IggyService;
use std::{env, sync::Arc};
@@ -68,14 +68,11 @@ async fn main() -> Result<(), McpRuntimeError> {
let config_path = config_path.unwrap_or_else(|_| "config".to_string());
eprintln!("Configuration file path: {config_path}");
- let config: McpServerConfig = Config::builder()
-
.add_source(Config::try_from(&McpServerConfig::default()).expect("Failed to
init config"))
- .add_source(File::with_name(&config_path).required(false))
- .add_source(Environment::with_prefix("IGGY_MCP").separator("_"))
- .build()
- .expect("Failed to build runtime config")
- .try_deserialize()
- .expect("Failed to deserialize runtime config");
+
+ let config: McpServerConfig = McpServerConfig::config_provider(config_path)
+ .load_config()
+ .await
+ .expect("Failed to load configuration");
let transport = config.transport;
if transport == McpTransport::Stdio {
diff --git a/core/common/src/configs/mod.rs b/core/common/src/configs/mod.rs
index c4cb8584..7a10d853 100644
--- a/core/common/src/configs/mod.rs
+++ b/core/common/src/configs/mod.rs
@@ -212,6 +212,19 @@ impl<T: Serialize + DeserializeOwned + Default + Display>
CustomEnvProvider<T> {
keys: &[String],
value: FigmentValue,
) {
+ if keys.is_empty() {
+ return;
+ }
+
+ // Try to detect HashMap patterns
+ if let Some(hashmap_result) =
+ Self::try_handle_hashmap_override(source, target, keys,
value.clone())
+ && hashmap_result
+ {
+ return;
+ }
+
+ // Fallback to original logic for non-HashMap patterns
let mut current_source = source;
let mut current_target = target;
let mut combined_keys = Vec::new();
@@ -249,6 +262,258 @@ impl<T: Serialize + DeserializeOwned + Default + Display>
CustomEnvProvider<T> {
}
}
+ fn try_handle_hashmap_override(
+ source: &Dict,
+ target: &mut Dict,
+ keys: &[String],
+ value: FigmentValue,
+ ) -> Option<bool> {
+ if keys.len() < 2 {
+ return Some(false);
+ }
+
+ let potential_hashmap_key = &keys[0];
+
+ // Check if this is actually a HashMap field in the source
+ let source_hashmap = match source.get(potential_hashmap_key) {
+ Some(FigmentValue::Dict(_, hashmap_dict)) => {
+ // For it to be a HashMap, it should either:
+ // 1. Have Dict values (actual HashMap entries)
+ // 2. Be empty (empty HashMap)
+ // 3. But NOT have regular struct fields
+
+ let has_dict_values = hashmap_dict
+ .values()
+ .any(|v| matches!(v, FigmentValue::Dict(_, _)));
+ let has_non_dict_values = hashmap_dict
+ .values()
+ .any(|v| !matches!(v, FigmentValue::Dict(_, _)));
+
+ // If it has non-Dict values mixed with Dict values, it's
probably a regular struct, not a HashMap
+ if has_non_dict_values && has_dict_values {
+ return Some(false);
+ }
+
+ // If it only has non-Dict values, it's definitely a regular
struct
+ if has_non_dict_values && !has_dict_values {
+ return Some(false);
+ }
+
+ // If it's empty or only has Dict values, it could be a HashMap
+ hashmap_dict
+ }
+ _ => return Some(false),
+ };
+
+ // Try to find the best HashMap entry key
+ if let Some((entry_key, remaining_keys)) =
+ Self::find_best_hashmap_split(source_hashmap, &keys[1..])
+ {
+ return Some(Self::apply_hashmap_override(
+ target,
+ potential_hashmap_key,
+ &entry_key,
+ &remaining_keys,
+ source_hashmap,
+ value,
+ ));
+ }
+
+ Some(false)
+ }
+
+ fn find_best_hashmap_split(
+ source_hashmap: &Dict,
+ keys: &[String],
+ ) -> Option<(String, Vec<String>)> {
+ if keys.is_empty() {
+ return None;
+ }
+
+ // First, try existing HashMap entry keys if any exist
+ if !source_hashmap.is_empty() {
+ for (existing_key, existing_value) in source_hashmap {
+ if let FigmentValue::Dict(_, entry_dict) = existing_value {
+ // Try different ways to match this existing key
+ for split_point in 1..=keys.len() {
+ let candidate_key = keys[0..split_point].join("_");
+
+ if candidate_key == *existing_key {
+ let remaining_keys = keys[split_point..].to_vec();
+
+ // Validate that the remaining keys form a valid
path in the entry
+ if remaining_keys.is_empty()
+ || Self::is_valid_field_path(entry_dict,
&remaining_keys)
+ {
+ return Some((existing_key.clone(),
remaining_keys));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // For empty HashMaps or when no existing keys match, use simple split
+ let simple_entry_key = keys[0].clone();
+ let simple_remaining = keys[1..].to_vec();
+ Some((simple_entry_key, simple_remaining))
+ }
+
+ fn is_valid_field_path(dict: &Dict, keys: &[String]) -> bool {
+ if keys.is_empty() {
+ return true;
+ }
+
+ // Try different combinations to account for fields with underscores
+ for split_point in 1..=keys.len() {
+ let field_name = keys[0..split_point].join("_");
+ let remaining_keys = &keys[split_point..];
+
+ if let Some(field_value) = dict.get(&field_name) {
+ if remaining_keys.is_empty() {
+ return true;
+ } else if let FigmentValue::Dict(_, nested_dict) = field_value
{
+ return Self::is_valid_field_path(nested_dict,
remaining_keys);
+ }
+ }
+ }
+
+ // Also try the original approach with combined keys
+ let mut current_dict = dict;
+ let mut combined_keys = Vec::new();
+
+ for key in keys {
+ combined_keys.push(key.clone());
+ let combined_field = combined_keys.join("_");
+
+ if let Some(field_value) = current_dict.get(&combined_field) {
+ match field_value {
+ FigmentValue::Dict(_, nested_dict) => {
+ current_dict = nested_dict;
+ combined_keys.clear();
+ }
+ _ => return true,
+ }
+ }
+ }
+
+ true
+ }
+
+ fn apply_hashmap_override(
+ target: &mut Dict,
+ hashmap_key: &str,
+ entry_key: &str,
+ remaining_keys: &[String],
+ source_hashmap: &Dict,
+ value: FigmentValue,
+ ) -> bool {
+ // Ensure the HashMap exists in target
+ target
+ .entry(hashmap_key.to_string())
+ .or_insert_with(|| FigmentValue::Dict(Tag::Default, Dict::new()));
+
+ if let Some(FigmentValue::Dict(_, target_hashmap)) =
target.get_mut(hashmap_key) {
+ // Ensure the specific entry exists in the HashMap
+ target_hashmap
+ .entry(entry_key.to_string())
+ .or_insert_with(|| {
+ if let Some(existing_entry) =
source_hashmap.get(entry_key) {
+ existing_entry.clone()
+ } else {
+ FigmentValue::Dict(Tag::Default, Dict::new())
+ }
+ });
+
+ if remaining_keys.is_empty() {
+ target_hashmap.insert(entry_key.to_string(), value);
+ } else if let Some(FigmentValue::Dict(_, entry_dict)) =
+ target_hashmap.get_mut(entry_key)
+ {
+ // Check if we need special handling for serde_json::Value
fields
+ if let Some((json_field, json_keys)) =
+ Self::find_json_value_field_split(entry_dict,
remaining_keys)
+ {
+ Self::handle_json_value_override(entry_dict, &json_field,
json_keys, value);
+ } else {
+ Self::insert_overridden_values_from_env(
+ &Dict::new(),
+ entry_dict,
+ remaining_keys.to_vec(),
+ value,
+ );
+ }
+ }
+ return true;
+ }
+
+ false
+ }
+
+ fn find_json_value_field_split(dict: &Dict, keys: &[String]) ->
Option<(String, Vec<String>)> {
+ if keys.is_empty() {
+ return None;
+ }
+
+ // Try different split points to find a field that exists
+ for split_point in 1..=keys.len() {
+ let potential_json_field = keys[0..split_point].join("_");
+ let remaining_keys = keys[split_point..].to_vec();
+
+ if dict.contains_key(&potential_json_field) &&
!remaining_keys.is_empty() {
+ return Some((potential_json_field, remaining_keys));
+ }
+ }
+
+ // If no existing field matches and we have multiple keys, use first
key
+ if keys.len() > 1 {
+ let potential_new_field = keys[0].clone();
+ let remaining_keys = keys[1..].to_vec();
+ return Some((potential_new_field, remaining_keys));
+ }
+
+ None
+ }
+
+ fn handle_json_value_override(
+ entry_dict: &mut Dict,
+ json_field: &str,
+ json_keys: Vec<String>,
+ value: FigmentValue,
+ ) {
+ // Ensure the JSON value field exists
+ entry_dict
+ .entry(json_field.to_string())
+ .or_insert_with(|| FigmentValue::Dict(Tag::Default, Dict::new()));
+
+ // Get or create the JSON field as a Dict
+ if let Some(json_value) = entry_dict.get_mut(json_field) {
+ let json_dict = match json_value {
+ FigmentValue::Dict(_, dict) => dict,
+ _ => {
+ *json_value = FigmentValue::Dict(Tag::Default,
Dict::new());
+ if let FigmentValue::Dict(_, dict) = json_value {
+ dict
+ } else {
+ return;
+ }
+ }
+ };
+
+ Self::set_nested_json_field(json_dict, &json_keys, value);
+ }
+ }
+
+ fn set_nested_json_field(dict: &mut Dict, keys: &[String], value:
FigmentValue) {
+ if keys.is_empty() {
+ return;
+ }
+
+ // Always try the full field name first (with underscores)
+ let full_field_name = keys.join("_");
+ dict.insert(full_field_name, value);
+ }
+
fn navigate_to_dict<'a>(target: &'a mut Dict, path: &[String]) ->
Option<&'a mut Dict> {
if path.is_empty() {
return Some(target);
diff --git a/core/connectors/runtime/Cargo.toml
b/core/connectors/runtime/Cargo.toml
index 46e1bb60..759f12a2 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -31,7 +31,6 @@ readme = "README.md"
[dependencies]
axum = { workspace = true }
axum-server = { workspace = true }
-config = { workspace = true }
dashmap = { workspace = true }
dlopen2 = { workspace = true }
dotenvy = { workspace = true }
diff --git a/core/connectors/runtime/config.toml
b/core/connectors/runtime/config.toml
index ea868f8e..f106cb5d 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -42,137 +42,3 @@ password = "iggy"
[state]
path = "local_state"
-
-[sinks.stdout]
-enabled = true
-name = "Stdout sink"
-path = "target/release/libiggy_connector_stdout_sink"
-
-[[sinks.stdout.streams]]
-stream = "example_stream"
-topics = ["example_topic"]
-schema = "json"
-batch_length = 100
-poll_interval = "5ms"
-consumer_group = "stdout_sink_connector"
-
-[sinks.stdout.config]
-print_payload = false
-
-[sinks.stdout.transforms.add_fields]
-enabled = true
-
-[[sinks.stdout.transforms.add_fields.fields]]
-key = "message"
-value.static = "hello"
-
-[sources.random]
-enabled = true
-name = "Random source"
-path = "target/release/libiggy_connector_random_source"
-config_format = "json"
-
-[[sources.random.streams]]
-stream = "example_stream"
-topic = "example_topic"
-schema = "json"
-batch_length = 1000
-linger_time = "5ms"
-
-[sources.random.config]
-interval = "100ms"
-# max_count = 1000
-messages_range = [10, 50]
-payload_size = 200
-
-[sources.random.transforms.add_fields]
-enabled = true
-
-[[sources.random.transforms.add_fields.fields]]
-key = "test_field"
-value.static = "hello!"
-
-[sinks.quickwit]
-enabled = true
-name = "Quickwit sink 1"
-path = "target/release/libiggy_connector_quickwit_sink"
-config_format = "yaml"
-
-[[sinks.quickwit.streams]]
-stream = "qw"
-topics = ["records"]
-schema = "json"
-batch_length = 1000
-poll_interval = "5ms"
-consumer_group = "qw_sink_connector"
-
-[sinks.quickwit.transforms.add_fields]
-enabled = true
-
-[[sinks.quickwit.transforms.add_fields.fields]]
-key = "service_name"
-value.static = "qw_connector"
-
-[[sinks.quickwit.transforms.add_fields.fields]]
-key = "timestamp"
-value.computed = "timestamp_millis"
-
-[[sinks.quickwit.transforms.add_fields.fields]]
-key = "random_id"
-value.computed = "uuid_v7"
-
-[sinks.quickwit.transforms.delete_fields]
-enabled = true
-fields = ["email", "created_at"]
-
-[sinks.quickwit.config]
-url = "http://localhost:7280"
-index = """
-version: 0.9
-
-index_id: events
-
-doc_mapping:
- mode: strict
- field_mappings:
- - name: timestamp
- type: datetime
- input_formats: [unix_timestamp]
- output_format: unix_timestamp_nanos
- indexed: false
- fast: true
- fast_precision: milliseconds
- - name: service_name
- type: text
- tokenizer: raw
- fast: true
- - name: random_id
- type: text
- tokenizer: raw
- fast: true
- - name: user_id
- type: text
- tokenizer: raw
- fast: true
- - name: user_type
- type: u64
- fast: true
- - name: source
- type: text
- tokenizer: default
- - name: state
- type: text
- tokenizer: default
- - name: message
- type: text
- tokenizer: default
-
- timestamp_field: timestamp
-
-indexing_settings:
- commit_timeout_secs: 10
-
-retention:
- period: 7 days
- schedule: daily
-"""
diff --git a/core/connectors/runtime/config.toml
b/core/connectors/runtime/example_config.toml
similarity index 100%
copy from core/connectors/runtime/config.toml
copy to core/connectors/runtime/example_config.toml
diff --git a/core/connectors/runtime/src/configs.rs
b/core/connectors/runtime/src/configs.rs
index c7a5b41f..8974798d 100644
--- a/core/connectors/runtime/src/configs.rs
+++ b/core/connectors/runtime/src/configs.rs
@@ -125,11 +125,7 @@ impl std::fmt::Display for RuntimeConfig {
write!(
f,
"RuntimeConfig {{ http: {:?}, iggy: {:?}, sinks: {:?}, sources:
{:?}, state: {:?} }}",
- self.http_api,
- self.iggy,
- self.sinks.keys(),
- self.sources.keys(),
- self.state
+ self.http_api, self.iggy, self.sinks, self.sources, self.state
)
}
}
diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
index dd1f4f14..8e04f6f0 100644
--- a/core/connectors/sdk/src/sink.rs
+++ b/core/connectors/sdk/src/sink.rs
@@ -73,9 +73,12 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
return -1;
};
- let Ok(config) = serde_json::from_str(config_str) else {
- error!("Failed to parse configuration for sink connector with
ID: {id}",);
- return -1;
+ let config = match serde_json::from_str::<C>(config_str) {
+ Ok(cfg) => cfg,
+ Err(err) => {
+ error!("Failed to parse configuration for sink connector
with ID: {id}. {err}",);
+ return -1;
+ }
};
let mut sink = factory(id, config);
diff --git a/core/integration/src/test_connectors_runtime.rs
b/core/integration/src/test_connectors_runtime.rs
index 9a203fc3..00e419b9 100644
--- a/core/integration/src/test_connectors_runtime.rs
+++ b/core/integration/src/test_connectors_runtime.rs
@@ -28,8 +28,6 @@ use std::{collections::HashMap, net::TcpListener};
use tokio::time::sleep;
use uuid::Uuid;
-use crate::bench_utils::get_random_path;
-
const LOCAL_STATE_PREFIX: &str = "local_state_";
pub const CONSUMER_NAME: &str = "connectors";
@@ -71,7 +69,10 @@ impl TestConnectorsRuntime {
"IGGY_CONNECTORS_IGGY_CONSUMER".to_string(),
CONSUMER_NAME.to_string(),
);
- envs.insert("IGGY_CONNECTORS_STATE_PATH".to_string(),
get_random_path());
+ envs.insert(
+ "IGGY_CONNECTORS_STATE_PATH".to_string(),
+ Self::get_random_path(),
+ );
Self::create(envs, server_executable_path)
}