This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_test
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/connectors_test by this push:
     new aa61e536 Improve loading configs for connectors and MCP, add initial 
e2e suite for connectors
aa61e536 is described below

commit aa61e5364d02bf38ac1334922c6a5c440e2c9654
Author: spetz <[email protected]>
AuthorDate: Fri Sep 19 23:35:17 2025 +0200

    Improve loading configs for connectors and MCP, add initial e2e suite for 
connectors
---
 Cargo.lock                                         | 327 ++++++++++++++++++++-
 core/ai/mcp/src/error.rs                           |   2 +
 core/ai/mcp/src/main.rs                            |  17 +-
 core/connectors/runtime/README.md                  |   8 +-
 core/connectors/runtime/src/error.rs               |   2 +
 core/connectors/runtime/src/main.rs                |  17 +-
 core/integration/Cargo.toml                        |   1 +
 core/integration/src/test_connectors_runtime.rs    |   7 +-
 core/integration/tests/connectors/mod.rs           |  88 +++++-
 .../tests/connectors/sinks/mod.rs}                 |  22 +-
 .../tests/connectors/sinks/postgres/mod.rs         |  66 +++++
 .../tests/connectors/sinks/postgres/postgres.toml  |  39 +++
 .../tests/connectors/sources/mod.rs}               |  22 +-
 .../tests/connectors/sources/postgres/mod.rs}      |  22 --
 14 files changed, 551 insertions(+), 89 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 4a52e543..75de1710 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1040,6 +1040,82 @@ dependencies = [
  "generic-array",
 ]
 
+[[package]]
+name = "bollard"
+version = "0.19.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "899ca34eb6924d6ec2a77c6f7f5c7339e60fd68235eaf91edd5a15f12958bb06"
+dependencies = [
+ "async-stream",
+ "base64 0.22.1",
+ "bitflags 2.9.4",
+ "bollard-buildkit-proto",
+ "bollard-stubs",
+ "bytes",
+ "chrono",
+ "futures-core",
+ "futures-util",
+ "hex",
+ "home",
+ "http 1.3.1",
+ "http-body-util",
+ "hyper",
+ "hyper-named-pipe",
+ "hyper-rustls",
+ "hyper-util",
+ "hyperlocal",
+ "log",
+ "num",
+ "pin-project-lite",
+ "rand 0.9.2",
+ "rustls",
+ "rustls-native-certs",
+ "rustls-pemfile",
+ "rustls-pki-types",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "serde_repr",
+ "serde_urlencoded",
+ "thiserror 2.0.16",
+ "tokio",
+ "tokio-stream",
+ "tokio-util",
+ "tonic 0.13.1",
+ "tower-service",
+ "url",
+ "winapi",
+]
+
+[[package]]
+name = "bollard-buildkit-proto"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "40b3e79f8bd0f25f32660e3402afca46fd91bebaf135af017326d905651f8107"
+dependencies = [
+ "prost 0.13.5",
+ "prost-types 0.13.5",
+ "tonic 0.13.1",
+ "ureq",
+]
+
+[[package]]
+name = "bollard-stubs"
+version = "1.48.3-rc.28.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "64ea257e555d16a2c01e5593f40b73865cdf12efbceda33c6d14a2d8d1490368"
+dependencies = [
+ "base64 0.22.1",
+ "bollard-buildkit-proto",
+ "bytes",
+ "chrono",
+ "prost 0.13.5",
+ "serde",
+ "serde_json",
+ "serde_repr",
+ "serde_with",
+]
+
 [[package]]
 name = "bon"
 version = "3.7.2"
@@ -2192,6 +2268,17 @@ version = "0.3.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
 
+[[package]]
+name = "docker_credential"
+version = "1.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1d89dfcba45b4afad7450a99b39e751590463e45c04728cf555d36bb66940de8"
+dependencies = [
+ "base64 0.21.7",
+ "serde",
+ "serde_json",
+]
+
 [[package]]
 name = "document-features"
 version = "0.2.11"
@@ -2394,6 +2481,17 @@ dependencies = [
  "windows-sys 0.48.0",
 ]
 
+[[package]]
+name = "etcetera"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "26c7b13d0780cb82722fd59f6f57f925e143427e4a75313a6c77243bf5326ae6"
+dependencies = [
+ "cfg-if",
+ "home",
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "event-listener"
 version = "5.4.1"
@@ -2498,6 +2596,18 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "filetime"
+version = "0.2.26"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "bc0505cd1b6fa6580283f6bdf70a73fcf4aba1184038c90902b92b3dd0df63ed"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "libredox",
+ "windows-sys 0.60.2",
+]
+
 [[package]]
 name = "find-msvc-tools"
 version = "0.1.1"
@@ -3592,6 +3702,21 @@ dependencies = [
  "want",
 ]
 
+[[package]]
+name = "hyper-named-pipe"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278"
+dependencies = [
+ "hex",
+ "hyper",
+ "hyper-util",
+ "pin-project-lite",
+ "tokio",
+ "tower-service",
+ "winapi",
+]
+
 [[package]]
 name = "hyper-rustls"
 version = "0.27.7"
@@ -3648,6 +3773,21 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "hyperlocal"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7"
+dependencies = [
+ "hex",
+ "http-body-util",
+ "hyper",
+ "hyper-util",
+ "pin-project-lite",
+ "tokio",
+ "tower-service",
+]
+
 [[package]]
 name = "iana-time-zone"
 version = "0.1.64"
@@ -4276,6 +4416,7 @@ dependencies = [
  "server",
  "tempfile",
  "test-case",
+ "testcontainers-modules",
  "tokio",
  "twox-hash",
  "uuid",
@@ -5127,6 +5268,20 @@ dependencies = [
  "windows-sys 0.52.0",
 ]
 
+[[package]]
+name = "num"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23"
+dependencies = [
+ "num-bigint",
+ "num-complex",
+ "num-integer",
+ "num-iter",
+ "num-rational",
+ "num-traits",
+]
+
 [[package]]
 name = "num-bigint"
 version = "0.4.6"
@@ -5154,6 +5309,15 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "num-complex"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495"
+dependencies = [
+ "num-traits",
+]
+
 [[package]]
 name = "num-conv"
 version = "0.1.0"
@@ -5195,6 +5359,17 @@ dependencies = [
  "num-modular",
 ]
 
+[[package]]
+name = "num-rational"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824"
+dependencies = [
+ "num-bigint",
+ "num-integer",
+ "num-traits",
+]
+
 [[package]]
 name = "num-traits"
 version = "0.2.19"
@@ -5529,6 +5704,31 @@ dependencies = [
  "windows-targets 0.52.6",
 ]
 
+[[package]]
+name = "parse-display"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a"
+dependencies = [
+ "parse-display-derive",
+ "regex",
+ "regex-syntax 0.8.6",
+]
+
+[[package]]
+name = "parse-display-derive"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2ae7800a4c974efd12df917266338e79a7a74415173caf7e70aa0a0707345281"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "regex",
+ "regex-syntax 0.8.6",
+ "structmeta",
+ "syn 2.0.106",
+]
+
 [[package]]
 name = "passterm"
 version = "2.0.1"
@@ -6318,6 +6518,15 @@ dependencies = [
  "bitflags 1.3.2",
 ]
 
+[[package]]
+name = "redox_syscall"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
+dependencies = [
+ "bitflags 1.3.2",
+]
+
 [[package]]
 name = "redox_syscall"
 version = "0.5.17"
@@ -7076,6 +7285,17 @@ dependencies = [
  "serde_core",
 ]
 
+[[package]]
+name = "serde_repr"
+version = "0.1.20"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.106",
+]
+
 [[package]]
 name = "serde_spanned"
 version = "0.6.9"
@@ -7596,7 +7816,7 @@ dependencies = [
  "chrono",
  "crc",
  "dotenvy",
- "etcetera",
+ "etcetera 0.8.0",
  "futures-channel",
  "futures-core",
  "futures-util",
@@ -7704,6 +7924,29 @@ version = "0.11.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
 
+[[package]]
+name = "structmeta"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "structmeta-derive",
+ "syn 2.0.106",
+]
+
+[[package]]
+name = "structmeta-derive"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.106",
+]
+
 [[package]]
 name = "strum"
 version = "0.27.2"
@@ -7907,6 +8150,45 @@ dependencies = [
  "test-case-core",
 ]
 
+[[package]]
+name = "testcontainers"
+version = "0.25.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b92bce247dc9260a19808321e11b51ea6a0293d02b48ab1c6578960610cfa2a7"
+dependencies = [
+ "async-trait",
+ "bollard",
+ "bollard-stubs",
+ "bytes",
+ "docker_credential",
+ "either",
+ "etcetera 0.10.0",
+ "futures",
+ "log",
+ "memchr",
+ "parse-display",
+ "pin-project-lite",
+ "serde",
+ "serde_json",
+ "serde_with",
+ "thiserror 2.0.16",
+ "tokio",
+ "tokio-stream",
+ "tokio-tar",
+ "tokio-util",
+ "ulid",
+ "url",
+]
+
+[[package]]
+name = "testcontainers-modules"
+version = "0.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1966329d5bb3f89d33602d2db2da971fb839f9297dad16527abf4564e2ae0a6d"
+dependencies = [
+ "testcontainers",
+]
+
 [[package]]
 name = "textwrap"
 version = "0.16.2"
@@ -8086,6 +8368,21 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "tokio-tar"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75"
+dependencies = [
+ "filetime",
+ "futures-core",
+ "libc",
+ "redox_syscall 0.3.5",
+ "tokio",
+ "tokio-stream",
+ "xattr",
+]
+
 [[package]]
 name = "tokio-util"
 version = "0.7.16"
@@ -8240,8 +8537,10 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
 dependencies = [
  "async-trait",
+ "axum 0.8.4",
  "base64 0.22.1",
  "bytes",
+ "h2 0.4.12",
  "http 1.3.1",
  "http-body",
  "http-body-util",
@@ -8251,6 +8550,7 @@ dependencies = [
  "percent-encoding",
  "pin-project",
  "prost 0.13.5",
+ "socket2 0.5.10",
  "tokio",
  "tokio-stream",
  "tower 0.5.2",
@@ -8602,6 +8902,21 @@ version = "0.0.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae"
 
+[[package]]
+name = "ureq"
+version = "2.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d"
+dependencies = [
+ "base64 0.22.1",
+ "log",
+ "once_cell",
+ "rustls",
+ "rustls-pki-types",
+ "url",
+ "webpki-roots 0.26.11",
+]
+
 [[package]]
 name = "url"
 version = "2.5.7"
@@ -9521,6 +9836,16 @@ dependencies = [
  "tap",
 ]
 
+[[package]]
+name = "xattr"
+version = "1.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "af3a19837351dc82ba89f8a125e22a3c475f05aba604acc023d62b2739ae2909"
+dependencies = [
+ "libc",
+ "rustix",
+]
+
 [[package]]
 name = "yaml-rust2"
 version = "0.10.4"
diff --git a/core/ai/mcp/src/error.rs b/core/ai/mcp/src/error.rs
index 0344cfac..0a8c0363 100644
--- a/core/ai/mcp/src/error.rs
+++ b/core/ai/mcp/src/error.rs
@@ -36,4 +36,6 @@ pub enum McpRuntimeError {
     FailedToCreateConsumerId,
     #[error("Invalid API path")]
     InvalidApiPath,
+    #[error("Configuration not found: {0}")]
+    ConfigurationNotFound(String),
 }
diff --git a/core/ai/mcp/src/main.rs b/core/ai/mcp/src/main.rs
index 1337ce0b..b118720d 100644
--- a/core/ai/mcp/src/main.rs
+++ b/core/ai/mcp/src/main.rs
@@ -51,7 +51,22 @@ async fn main() -> Result<(), McpRuntimeError> {
         );
     }
 
-    let config_path = env::var("IGGY_MCP_CONFIG_PATH").unwrap_or_else(|_| 
"config".to_string());
+    let config_path = env::var("IGGY_MCP_CONFIG_PATH");
+    if let Ok(ref path) = config_path {
+        let config_with_extension = if path.contains('.') {
+            path.to_owned()
+        } else {
+            format!("{path}.toml")
+        };
+        eprintln!("Checking if config path exists: 
{config_with_extension}...");
+        if !std::fs::exists(&config_with_extension).unwrap_or_default() {
+            return Err(McpRuntimeError::ConfigurationNotFound(
+                config_with_extension,
+            ));
+        }
+    }
+
+    let config_path = config_path.unwrap_or_else(|_| "config".to_string());
     eprintln!("Configuration file path: {config_path}");
     let config: McpServerConfig = Config::builder()
         
.add_source(Config::try_from(&McpServerConfig::default()).expect("Failed to 
init config"))
diff --git a/core/connectors/runtime/README.md 
b/core/connectors/runtime/README.md
index bee66d27..34019292 100644
--- a/core/connectors/runtime/README.md
+++ b/core/connectors/runtime/README.md
@@ -33,15 +33,15 @@ Keep in mind that either of `toml`, `yaml`, or `json` 
formats are supported for
 
 ## HTTP API
 
-Connector runtime has an optional HTTP API that can be enabled by setting the 
`enabled` flag to `true` in the `[http_api]` section.
+Connector runtime has an optional HTTP API that can be enabled by setting the 
`enabled` flag to `true` in the `[http]` section.
 
 ```toml
-[http_api] # Optional HTTP API configuration
+[http] # Optional HTTP API configuration
 enabled = true
 address = "127.0.0.1:8081"
 # api_key = "secret" # Optional API key for authentication to be passed as 
`api-key` header
 
-[http_api.cors] # Optional CORS configuration for HTTP API
+[http.cors] # Optional CORS configuration for HTTP API
 enabled = false
 allowed_methods = ["GET", "POST", "PUT", "DELETE"]
 allowed_origins = ["*"]
@@ -50,7 +50,7 @@ exposed_headers = [""]
 allow_credentials = false
 allow_private_network = false
 
-[http_api.tls] # Optional TLS configuration for HTTP API
+[http.tls] # Optional TLS configuration for HTTP API
 enabled = false
 cert_file = "core/certs/iggy_cert.pem"
 key_file = "core/certs/iggy_key.pem"
diff --git a/core/connectors/runtime/src/error.rs 
b/core/connectors/runtime/src/error.rs
index c622feab..f8674df8 100644
--- a/core/connectors/runtime/src/error.rs
+++ b/core/connectors/runtime/src/error.rs
@@ -22,6 +22,8 @@ use thiserror::Error;
 pub enum RuntimeError {
     #[error("Invalid configuration: {0}")]
     InvalidConfiguration(String),
+    #[error("Configuration not found: {0}")]
+    ConfigurationNotFound(String),
     #[error("Failed to serialize topic metadata")]
     FailedToSerializeTopicMetadata,
     #[error("Failed to serialize messages metadata")]
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index 7b5d88a5..4ca3369e 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -108,9 +108,22 @@ async fn main() -> Result<(), RuntimeError> {
         
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
         .init();
 
-    let config_path =
-        env::var("IGGY_CONNECTORS_CONFIG_PATH").unwrap_or_else(|_| 
"config".to_string());
+    let config_path = env::var("IGGY_CONNECTORS_CONFIG_PATH");
+    if let Ok(ref path) = config_path {
+        let config_with_extension = if path.contains('.') {
+            path.to_owned()
+        } else {
+            format!("{path}.toml")
+        };
+        info!("Checking if config path exists: {config_with_extension}...");
+        if !std::fs::exists(&config_with_extension).unwrap_or_default() {
+            return 
Err(RuntimeError::ConfigurationNotFound(config_with_extension));
+        }
+    }
+
+    let config_path = config_path.unwrap_or_else(|_| "config".to_string());
     info!("Starting Iggy Connectors Runtime, loading configuration from: 
{config_path}...");
+
     let config: RuntimeConfig = Config::builder()
         .add_source(Config::try_from(&RuntimeConfig::default()).expect("Failed 
to init config"))
         .add_source(File::with_name(&config_path).required(false))
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 61be6918..5e86a564 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -58,6 +58,7 @@ serial_test = { workspace = true }
 server = { workspace = true }
 tempfile = { workspace = true }
 test-case = { workspace = true }
+testcontainers-modules = { version = "0.13.0", features = ["postgres"] }
 tokio = { workspace = true }
 twox-hash = { workspace = true }
 uuid = { workspace = true }
diff --git a/core/integration/src/test_connectors_runtime.rs 
b/core/integration/src/test_connectors_runtime.rs
index bf175eca..d28f67a7 100644
--- a/core/integration/src/test_connectors_runtime.rs
+++ b/core/integration/src/test_connectors_runtime.rs
@@ -40,8 +40,11 @@ pub struct TestConnectorsRuntime {
 }
 
 impl TestConnectorsRuntime {
-    pub fn with_iggy_address(iggy_tcp_server_address: &str) -> Self {
-        Self::new(iggy_tcp_server_address, None, None)
+    pub fn with_iggy_address(
+        iggy_tcp_server_address: &str,
+        envs: Option<HashMap<String, String>>,
+    ) -> Self {
+        Self::new(iggy_tcp_server_address, envs, None)
     }
 
     pub fn new(
diff --git a/core/integration/tests/connectors/mod.rs 
b/core/integration/tests/connectors/mod.rs
index 2689cdae..789ca3ed 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -16,40 +16,98 @@
  * under the License.
  */
 
+use iggy::prelude::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, IggyClient};
+use iggy_binary_protocol::UserClient;
 use integration::{
+    tcp_client::TcpClientFactory,
     test_connectors_runtime::TestConnectorsRuntime,
-    test_server::{IpAddrKind, TestServer},
+    test_server::{ClientFactory, IpAddrKind, TestServer},
 };
 use serial_test::parallel;
 use std::collections::HashMap;
 
+mod sinks;
+mod sources;
+
 #[tokio::test]
 #[parallel]
 async fn connectors_runtime_should_start() {
-    setup().await;
+    let mut infra = setup();
+    infra.start_connectors_runtime(None, None).await;
 }
 
-async fn setup() -> ConnectorsInfra {
+fn setup() -> ConnectorsInfra {
     let mut iggy_envs = HashMap::new();
     iggy_envs.insert("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned());
     let mut test_server = TestServer::new(Some(iggy_envs), true, None, 
IpAddrKind::V4);
     test_server.start();
-    let iggy_server_address = test_server
-        .get_raw_tcp_addr()
-        .expect("Failed to get Iggy TCP address");
-
-    let mut connectors_runtime = 
TestConnectorsRuntime::with_iggy_address(&iggy_server_address);
-    connectors_runtime.start();
-    connectors_runtime.ensure_started().await;
-
     ConnectorsInfra {
-        _iggy_server: test_server,
-        _connectors_runtime: connectors_runtime,
+        iggy_server: test_server,
+        connectors_runtim: None,
     }
 }
 
 #[derive(Debug)]
 struct ConnectorsInfra {
-    _iggy_server: TestServer,
-    _connectors_runtime: TestConnectorsRuntime,
+    iggy_server: TestServer,
+    connectors_runtim: Option<TestConnectorsRuntime>,
+}
+
+impl ConnectorsInfra {
+    pub async fn start_connectors_runtime(
+        &mut self,
+        config_path: Option<&str>,
+        envs: Option<HashMap<String, String>>,
+    ) {
+        if self.connectors_runtim.is_some() {
+            return;
+        }
+
+        let mut all_envs = None;
+        if let Some(config_path) = config_path {
+            let config_path = format!("tests/connectors/{config_path}");
+            let mut map = HashMap::new();
+            map.insert(
+                "IGGY_CONNECTORS_CONFIG_PATH".to_owned(),
+                config_path.to_owned(),
+            );
+            all_envs = Some(map);
+        }
+
+        if let Some(envs) = envs {
+            for (k, v) in envs {
+                all_envs
+                    .get_or_insert_with(HashMap::new)
+                    .insert(k.to_owned(), v.to_owned());
+            }
+        }
+
+        let iggy_server_address = self
+            .iggy_server
+            .get_raw_tcp_addr()
+            .expect("Failed to get Iggy TCP address");
+        let mut connectors_runtime =
+            TestConnectorsRuntime::with_iggy_address(&iggy_server_address, 
all_envs);
+        connectors_runtime.start();
+        connectors_runtime.ensure_started().await;
+        self.connectors_runtim = Some(connectors_runtime);
+    }
+
+    pub async fn create_client(&self) -> IggyClient {
+        let server_addr = self
+            .iggy_server
+            .get_raw_tcp_addr()
+            .expect("Failed to get Iggy TCP address");
+        let client = TcpClientFactory {
+            server_addr,
+            ..Default::default()
+        }
+        .create_client()
+        .await;
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .expect("Failed to login as root user");
+        IggyClient::create(client, None, None)
+    }
 }
diff --git a/core/ai/mcp/src/error.rs 
b/core/integration/tests/connectors/sinks/mod.rs
similarity index 55%
copy from core/ai/mcp/src/error.rs
copy to core/integration/tests/connectors/sinks/mod.rs
index 0344cfac..9c64d589 100644
--- a/core/ai/mcp/src/error.rs
+++ b/core/integration/tests/connectors/sinks/mod.rs
@@ -16,24 +16,4 @@
  * under the License.
  */
 
-use thiserror::Error;
-
-#[derive(Debug, Error)]
-pub enum McpRuntimeError {
-    #[error("Failed to create service")]
-    FailedToCreateService,
-    #[error("Missing configuration")]
-    MissingConfig,
-    #[error("Failed to start HTTP server")]
-    FailedToStartHttpServer,
-    #[error("Iggy client error")]
-    IggyClient(#[from] iggy::prelude::ClientError),
-    #[error("Iggy error")]
-    IggyError(#[from] iggy::prelude::IggyError),
-    #[error("Missing Iggy credentials")]
-    MissingIggyCredentials,
-    #[error("Failed to create Iggy consumer ID")]
-    FailedToCreateConsumerId,
-    #[error("Invalid API path")]
-    InvalidApiPath,
-}
+mod postgres;
diff --git a/core/integration/tests/connectors/sinks/postgres/mod.rs 
b/core/integration/tests/connectors/sinks/postgres/mod.rs
new file mode 100644
index 00000000..8ab8fc8e
--- /dev/null
+++ b/core/integration/tests/connectors/sinks/postgres/mod.rs
@@ -0,0 +1,66 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::connectors::setup;
+use iggy_binary_protocol::{StreamClient, TopicClient};
+use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
+use std::collections::HashMap;
+use testcontainers_modules::{postgres, testcontainers::runners::AsyncRunner};
+
+#[tokio::test]
+async fn given_valid_configuration_postgres_sink_should_start() {
+    let container = postgres::Postgres::default()
+        .start()
+        .await
+        .expect("Failed to start Postgres");
+    let host_port = container
+        .get_host_port_ipv4(5432)
+        .await
+        .expect("Failed to get Postgres port");
+
+    let mut envs = HashMap::new();
+    envs.insert(
+        "IGGY_CONNECTORS_SINKS_POSTGRES_CONFIG_CONNECTION_STRING".to_owned(),
+        format!("postgres://postgres:postgres@localhost:{host_port}"),
+    );
+    let mut infra = setup();
+    let client = infra.create_client().await;
+    let stream_name = "test";
+    let topic_name = "test";
+    client
+        .create_stream("test", None)
+        .await
+        .expect("Failed to create stream");
+    let stream_id: Identifier = stream_name.try_into().expect("Invalid stream 
name");
+    client
+        .create_topic(
+            &stream_id,
+            topic_name,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            None,
+            IggyExpiry::ServerDefault,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .expect("Failed to create topic");
+    infra
+        .start_connectors_runtime(Some("sinks/postgres/postgres.toml"), 
Some(envs))
+        .await;
+}
diff --git a/core/integration/tests/connectors/sinks/postgres/postgres.toml 
b/core/integration/tests/connectors/sinks/postgres/postgres.toml
new file mode 100644
index 00000000..3dc2b1de
--- /dev/null
+++ b/core/integration/tests/connectors/sinks/postgres/postgres.toml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[sinks.postgres]
+enabled = true
+name = "Postgres sink"
+path = "../../target/debug/libiggy_connector_postgres_sink"
+
+[[sinks.postgres.streams]]
+stream = "test"
+topics = ["test"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "test"
+
+[sinks.postgres.config]
+connection_string = ""
+target_table = "iggy_messages"
+batch_size = 100
+max_connections = 10
+auto_create_table = true
+include_metadata = true
+include_checksum = true
+include_origin_timestamp = true
diff --git a/core/ai/mcp/src/error.rs 
b/core/integration/tests/connectors/sources/mod.rs
similarity index 55%
copy from core/ai/mcp/src/error.rs
copy to core/integration/tests/connectors/sources/mod.rs
index 0344cfac..9c64d589 100644
--- a/core/ai/mcp/src/error.rs
+++ b/core/integration/tests/connectors/sources/mod.rs
@@ -16,24 +16,4 @@
  * under the License.
  */
 
-use thiserror::Error;
-
-#[derive(Debug, Error)]
-pub enum McpRuntimeError {
-    #[error("Failed to create service")]
-    FailedToCreateService,
-    #[error("Missing configuration")]
-    MissingConfig,
-    #[error("Failed to start HTTP server")]
-    FailedToStartHttpServer,
-    #[error("Iggy client error")]
-    IggyClient(#[from] iggy::prelude::ClientError),
-    #[error("Iggy error")]
-    IggyError(#[from] iggy::prelude::IggyError),
-    #[error("Missing Iggy credentials")]
-    MissingIggyCredentials,
-    #[error("Failed to create Iggy consumer ID")]
-    FailedToCreateConsumerId,
-    #[error("Invalid API path")]
-    InvalidApiPath,
-}
+mod postgres;
diff --git a/core/ai/mcp/src/error.rs 
b/core/integration/tests/connectors/sources/postgres/mod.rs
similarity index 55%
copy from core/ai/mcp/src/error.rs
copy to core/integration/tests/connectors/sources/postgres/mod.rs
index 0344cfac..31bd66e6 100644
--- a/core/ai/mcp/src/error.rs
+++ b/core/integration/tests/connectors/sources/postgres/mod.rs
@@ -15,25 +15,3 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-use thiserror::Error;
-
-#[derive(Debug, Error)]
-pub enum McpRuntimeError {
-    #[error("Failed to create service")]
-    FailedToCreateService,
-    #[error("Missing configuration")]
-    MissingConfig,
-    #[error("Failed to start HTTP server")]
-    FailedToStartHttpServer,
-    #[error("Iggy client error")]
-    IggyClient(#[from] iggy::prelude::ClientError),
-    #[error("Iggy error")]
-    IggyError(#[from] iggy::prelude::IggyError),
-    #[error("Missing Iggy credentials")]
-    MissingIggyCredentials,
-    #[error("Failed to create Iggy consumer ID")]
-    FailedToCreateConsumerId,
-    #[error("Invalid API path")]
-    InvalidApiPath,
-}

Reply via email to