This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch connectors_state in repository https://gitbox.apache.org/repos/asf/iggy.git
commit d6109811b9a630850f7f8a6765c554ef9d42a45f Author: spetz <[email protected]> AuthorDate: Tue Jun 24 21:24:44 2025 +0200 Init work on connectors config --- Cargo.lock | 2 + Cargo.toml | 1 + core/connectors/README.md | 2 +- core/connectors/runtime/Cargo.toml | 2 + core/connectors/runtime/README.md | 2 +- core/connectors/runtime/src/main.rs | 100 ++++++++++------------------------ core/connectors/runtime/src/stream.rs | 70 ++++++++++++++++++++++++ core/server/Cargo.toml | 2 +- 8 files changed, 106 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa18adda..dcd04ccd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3776,6 +3776,8 @@ dependencies = [ "config", "dashmap", "dlopen2", + "dotenvy", + "figlet-rs", "flume", "futures", "iggy", diff --git a/Cargo.toml b/Cargo.toml index 1fa553c3..f910068b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ derive_more = { version = "2.0.1", features = ["full"] } derive-new = "0.7.0" dirs = "6.0.0" dlopen2 = "0.8.0" +dotenvy = "0.15.7" enum_dispatch = "0.3.13" figlet-rs = "0.1.5" flume = "0.11.1" diff --git a/core/connectors/README.md b/core/connectors/README.md index f7dd95dc..8bb81bec 100644 --- a/core/connectors/README.md +++ b/core/connectors/README.md @@ -21,7 +21,7 @@ The highly performant and modular runtime for statically typed, yet dynamically 2. Run `docker compose up -d` from `/examples/rust/src/sink-data-producer` which will start the Quickwit server to be used by an example sink connector. At this point, you can access the Quickwit UI at [http://localhost:7280](http://localhost:7280) - check this dashboard again later on, after the `events` index will be created. -3. Set environment variable `IGGY_CONNECTORS_RUNTIME_CONFIG_PATH=core/connectors/runtime/config` (adjust the path as needed) pointing to the runtime configuration file. +3. Set environment variable `IGGY_CONNECTORS_CONFIG_PATH=core/connectors/runtime/config` (adjust the path as needed) pointing to the runtime configuration file. 4. Start the Iggy server and invoke the following commands via Iggy CLI to create the example streams and topics used by the sample connectors. diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml index c3c3e15c..84912851 100644 --- a/core/connectors/runtime/Cargo.toml +++ b/core/connectors/runtime/Cargo.toml @@ -34,6 +34,8 @@ axum-server = { workspace = true } config = { workspace = true } dashmap = { workspace = true } dlopen2 = { workspace = true } +dotenvy = { workspace = true } +figlet-rs = { workspace = true } flume = { workspace = true } futures = { workspace = true } iggy = { workspace = true } diff --git a/core/connectors/runtime/README.md b/core/connectors/runtime/README.md index 18da28d5..ce5b2a82 100644 --- a/core/connectors/runtime/README.md +++ b/core/connectors/runtime/README.md @@ -22,7 +22,7 @@ password = "iggy" All the other config sections start either with `sources` or `sinks` depending on the connector type. -Keep in mind that either of `toml`, `yaml`, or `json` formats are supported for the configuration file. The path to the configuration can be overriden by `IGGY_CONNECTORS_RUNTIME_CONFIG_PATH` environment variable. Each configuration section can be also additionally updated by using the following convention `IGGY_CONNECTORS_SECTION_NAME.KEY_NAME` e.g. `IGGY_CONNECTORS_IGGY_USERNAME` and so on. +Keep in mind that either of `toml`, `yaml`, or `json` formats are supported for the configuration file. The path to the configuration can be overriden by `IGGY_CONNECTORS_CONFIG_PATH` environment variable. Each configuration section can be also additionally updated by using the following convention `IGGY_CONNECTORS_SECTION_NAME.KEY_NAME` e.g. `IGGY_CONNECTORS_IGGY_USERNAME` and so on. ## HTTP API diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index 67bf303a..9da57d52 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -19,8 +19,10 @@ use config::{Config, Environment, File}; use configs::{ConfigFormat, RuntimeConfig}; use dlopen2::wrapper::{Container, WrapperApi}; +use dotenvy::dotenv; use error::RuntimeError; -use iggy::prelude::{Client, IggyClient, IggyClientBuilder, IggyConsumer, IggyProducer}; +use figlet_rs::FIGfont; +use iggy::prelude::{Client, IggyConsumer, IggyProducer}; use iggy_connector_sdk::{ StreamDecoder, StreamEncoder, sink::ConsumeCallback, @@ -33,7 +35,7 @@ use std::{ env, sync::{Arc, atomic::AtomicU32}, }; -use tracing::{debug, error, info}; +use tracing::{debug, info}; use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; mod api; @@ -43,6 +45,7 @@ pub(crate) mod error; mod manager; mod sink; mod source; +mod stream; mod transform; #[global_allocator] @@ -77,13 +80,29 @@ struct SinkApi { #[tokio::main] async fn main() -> Result<(), RuntimeError> { + let standard_font = FIGfont::standard().unwrap(); + let figure = standard_font.convert("Iggy Connectors"); + println!("{}", figure.unwrap()); + + if let Ok(env_path) = std::env::var("IGGY_CONNECTORS_ENV_PATH") { + if dotenvy::from_path(&env_path).is_ok() { + println!("Loaded environment variables from path: {env_path}"); + } + } else if let Ok(path) = dotenv() { + println!( + "Loaded environment variables from .env file at path: {}", + path.display() + ); + } + Registry::default() .with(tracing_subscriber::fmt::layer()) .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) .init(); + let config_path = - env::var("IGGY_CONNECTORS_RUNTIME_CONFIG_PATH").unwrap_or_else(|_| "config".to_string()); - info!("Starting Iggy Connector Runtime, loading configuration from: {config_path}..."); + env::var("IGGY_CONNECTORS_CONFIG_PATH").unwrap_or_else(|_| "config".to_string()); + info!("Starting Iggy Connectors Runtime, loading configuration from: {config_path}..."); let builder = Config::builder() .add_source(File::with_name(&config_path)) .add_source(Environment::with_prefix("IGGY_CONNECTORS").separator("_")); @@ -94,30 +113,9 @@ async fn main() -> Result<(), RuntimeError> { .try_deserialize() .expect("Failed to deserialize runtime config"); - let iggy_config = config.iggy.clone(); - let iggy_address = iggy_config.address; - let iggy_username = iggy_config.username; - let iggy_password = iggy_config.password; - let iggy_token = iggy_config.token; - let consumer_client = create_iggy_client( - &iggy_address, - iggy_username.as_deref(), - iggy_password.as_deref(), - iggy_token.as_deref(), - ) - .await?; - consumer_client.connect().await?; - let producer_client = create_iggy_client( - &iggy_address, - iggy_username.as_deref(), - iggy_password.as_deref(), - iggy_token.as_deref(), - ) - .await?; - producer_client.connect().await?; - - let sources = source::init(config.sources.clone(), &producer_client).await?; - let sinks = sink::init(config.sinks.clone(), &consumer_client).await?; + let (stream_consumer, stream_producer) = stream::init(config.iggy.clone()).await?; + let sources = source::init(config.sources.clone(), &stream_producer).await?; + let sinks = sink::init(config.sinks.clone(), &stream_consumer).await?; let mut sink_wrappers = vec![]; let mut sink_with_plugins = HashMap::new(); @@ -196,8 +194,8 @@ async fn main() -> Result<(), RuntimeError> { } } - producer_client.shutdown().await?; - consumer_client.shutdown().await?; + stream_producer.shutdown().await?; + stream_consumer.shutdown().await?; info!("All connectors closed. Runtime shutdown complete."); Ok(()) @@ -220,48 +218,6 @@ pub fn resolve_plugin_path(path: &str) -> String { } } -async fn create_iggy_client( - address: &str, - username: Option<&str>, - password: Option<&str>, - token: Option<&str>, -) -> Result<IggyClient, RuntimeError> { - let connection_string = if let Some(token) = token { - if token.is_empty() { - error!("Iggy token cannot be empty (if username and password are not provided)"); - return Err(RuntimeError::MissingIggyCredentials); - } - - let redacted_token = token.chars().take(3).collect::<String>(); - info!("Using token: {redacted_token}*** for Iggy authentication"); - format!("iggy://{token}@{address}") - } else { - info!("Using username and password for Iggy authentication"); - let username = username.ok_or(RuntimeError::MissingIggyCredentials)?; - if username.is_empty() { - error!("Iggy password cannot be empty (if token is not provided)"); - return Err(RuntimeError::MissingIggyCredentials); - } - - let password = password.ok_or(RuntimeError::MissingIggyCredentials)?; - if password.is_empty() { - error!("Iggy password cannot be empty (if token is not provided)"); - return Err(RuntimeError::MissingIggyCredentials); - } - - let redacted_username = username.chars().take(3).collect::<String>(); - let redacted_password = password.chars().take(3).collect::<String>(); - info!( - "Using username: {redacted_username}***, password: {redacted_password}*** for Iggy authentication" - ); - format!("iggy://{username}:{password}@{address}") - }; - - let client = IggyClientBuilder::from_connection_string(&connection_string)?.build()?; - client.connect().await?; - Ok(client) -} - struct SinkConnector { container: Container<SinkApi>, plugins: Vec<SinkConnectorPlugin>, diff --git a/core/connectors/runtime/src/stream.rs b/core/connectors/runtime/src/stream.rs new file mode 100644 index 00000000..f67afd31 --- /dev/null +++ b/core/connectors/runtime/src/stream.rs @@ -0,0 +1,70 @@ +use iggy::prelude::{Client, IggyClient, IggyClientBuilder}; +use tracing::{error, info}; + +use crate::{configs::IggyConfig, error::RuntimeError}; + +pub async fn init(config: IggyConfig) -> Result<(IggyClient, IggyClient), RuntimeError> { + let iggy_address = config.address; + let iggy_username = config.username; + let iggy_password = config.password; + let iggy_token = config.token; + let consumer_client = create_client( + &iggy_address, + iggy_username.as_deref(), + iggy_password.as_deref(), + iggy_token.as_deref(), + ) + .await?; + consumer_client.connect().await?; + let producer_client = create_client( + &iggy_address, + iggy_username.as_deref(), + iggy_password.as_deref(), + iggy_token.as_deref(), + ) + .await?; + producer_client.connect().await?; + Ok((consumer_client, producer_client)) +} + +async fn create_client( + address: &str, + username: Option<&str>, + password: Option<&str>, + token: Option<&str>, +) -> Result<IggyClient, RuntimeError> { + let connection_string = if let Some(token) = token { + if token.is_empty() { + error!("Iggy token cannot be empty (if username and password are not provided)"); + return Err(RuntimeError::MissingIggyCredentials); + } + + let redacted_token = token.chars().take(3).collect::<String>(); + info!("Using token: {redacted_token}*** for Iggy authentication"); + format!("iggy://{token}@{address}") + } else { + info!("Using username and password for Iggy authentication"); + let username = username.ok_or(RuntimeError::MissingIggyCredentials)?; + if username.is_empty() { + error!("Iggy password cannot be empty (if token is not provided)"); + return Err(RuntimeError::MissingIggyCredentials); + } + + let password = password.ok_or(RuntimeError::MissingIggyCredentials)?; + if password.is_empty() { + error!("Iggy password cannot be empty (if token is not provided)"); + return Err(RuntimeError::MissingIggyCredentials); + } + + let redacted_username = username.chars().take(3).collect::<String>(); + let redacted_password = password.chars().take(3).collect::<String>(); + info!( + "Using username: {redacted_username}***, password: {redacted_password}*** for Iggy authentication" + ); + format!("iggy://{username}:{password}@{address}") + }; + + let client = IggyClientBuilder::from_connection_string(&connection_string)?.build()?; + client.connect().await?; + Ok(client) +} diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index b0e46455..3a0cf94e 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -50,7 +50,7 @@ console-subscriber = { workspace = true, optional = true } crossbeam = { workspace = true } dashmap = { workspace = true } derive_more = { workspace = true } -dotenvy = "0.15.7" +dotenvy = { workspace = true } enum_dispatch = { workspace = true } error_set = { version = "0.8.5", features = ["tracing"] } figlet-rs = { workspace = true }
