This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch connectors_test
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/connectors_test by this push:
new aa61e536 Improve loading configs for connectors and MCP, add initial
e2e suite for connectors
aa61e536 is described below
commit aa61e5364d02bf38ac1334922c6a5c440e2c9654
Author: spetz <[email protected]>
AuthorDate: Fri Sep 19 23:35:17 2025 +0200
Improve loading configs for connectors and MCP, add initial e2e suite for
connectors
---
Cargo.lock | 327 ++++++++++++++++++++-
core/ai/mcp/src/error.rs | 2 +
core/ai/mcp/src/main.rs | 17 +-
core/connectors/runtime/README.md | 8 +-
core/connectors/runtime/src/error.rs | 2 +
core/connectors/runtime/src/main.rs | 17 +-
core/integration/Cargo.toml | 1 +
core/integration/src/test_connectors_runtime.rs | 7 +-
core/integration/tests/connectors/mod.rs | 88 +++++-
.../tests/connectors/sinks/mod.rs} | 22 +-
.../tests/connectors/sinks/postgres/mod.rs | 66 +++++
.../tests/connectors/sinks/postgres/postgres.toml | 39 +++
.../tests/connectors/sources/mod.rs} | 22 +-
.../tests/connectors/sources/postgres/mod.rs} | 22 --
14 files changed, 551 insertions(+), 89 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 4a52e543..75de1710 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1040,6 +1040,82 @@ dependencies = [
"generic-array",
]
+[[package]]
+name = "bollard"
+version = "0.19.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "899ca34eb6924d6ec2a77c6f7f5c7339e60fd68235eaf91edd5a15f12958bb06"
+dependencies = [
+ "async-stream",
+ "base64 0.22.1",
+ "bitflags 2.9.4",
+ "bollard-buildkit-proto",
+ "bollard-stubs",
+ "bytes",
+ "chrono",
+ "futures-core",
+ "futures-util",
+ "hex",
+ "home",
+ "http 1.3.1",
+ "http-body-util",
+ "hyper",
+ "hyper-named-pipe",
+ "hyper-rustls",
+ "hyper-util",
+ "hyperlocal",
+ "log",
+ "num",
+ "pin-project-lite",
+ "rand 0.9.2",
+ "rustls",
+ "rustls-native-certs",
+ "rustls-pemfile",
+ "rustls-pki-types",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "serde_repr",
+ "serde_urlencoded",
+ "thiserror 2.0.16",
+ "tokio",
+ "tokio-stream",
+ "tokio-util",
+ "tonic 0.13.1",
+ "tower-service",
+ "url",
+ "winapi",
+]
+
+[[package]]
+name = "bollard-buildkit-proto"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "40b3e79f8bd0f25f32660e3402afca46fd91bebaf135af017326d905651f8107"
+dependencies = [
+ "prost 0.13.5",
+ "prost-types 0.13.5",
+ "tonic 0.13.1",
+ "ureq",
+]
+
+[[package]]
+name = "bollard-stubs"
+version = "1.48.3-rc.28.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "64ea257e555d16a2c01e5593f40b73865cdf12efbceda33c6d14a2d8d1490368"
+dependencies = [
+ "base64 0.22.1",
+ "bollard-buildkit-proto",
+ "bytes",
+ "chrono",
+ "prost 0.13.5",
+ "serde",
+ "serde_json",
+ "serde_repr",
+ "serde_with",
+]
+
[[package]]
name = "bon"
version = "3.7.2"
@@ -2192,6 +2268,17 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
+[[package]]
+name = "docker_credential"
+version = "1.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1d89dfcba45b4afad7450a99b39e751590463e45c04728cf555d36bb66940de8"
+dependencies = [
+ "base64 0.21.7",
+ "serde",
+ "serde_json",
+]
+
[[package]]
name = "document-features"
version = "0.2.11"
@@ -2394,6 +2481,17 @@ dependencies = [
"windows-sys 0.48.0",
]
+[[package]]
+name = "etcetera"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "26c7b13d0780cb82722fd59f6f57f925e143427e4a75313a6c77243bf5326ae6"
+dependencies = [
+ "cfg-if",
+ "home",
+ "windows-sys 0.59.0",
+]
+
[[package]]
name = "event-listener"
version = "5.4.1"
@@ -2498,6 +2596,18 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "filetime"
+version = "0.2.26"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc0505cd1b6fa6580283f6bdf70a73fcf4aba1184038c90902b92b3dd0df63ed"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "libredox",
+ "windows-sys 0.60.2",
+]
+
[[package]]
name = "find-msvc-tools"
version = "0.1.1"
@@ -3592,6 +3702,21 @@ dependencies = [
"want",
]
+[[package]]
+name = "hyper-named-pipe"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278"
+dependencies = [
+ "hex",
+ "hyper",
+ "hyper-util",
+ "pin-project-lite",
+ "tokio",
+ "tower-service",
+ "winapi",
+]
+
[[package]]
name = "hyper-rustls"
version = "0.27.7"
@@ -3648,6 +3773,21 @@ dependencies = [
"tracing",
]
+[[package]]
+name = "hyperlocal"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7"
+dependencies = [
+ "hex",
+ "http-body-util",
+ "hyper",
+ "hyper-util",
+ "pin-project-lite",
+ "tokio",
+ "tower-service",
+]
+
[[package]]
name = "iana-time-zone"
version = "0.1.64"
@@ -4276,6 +4416,7 @@ dependencies = [
"server",
"tempfile",
"test-case",
+ "testcontainers-modules",
"tokio",
"twox-hash",
"uuid",
@@ -5127,6 +5268,20 @@ dependencies = [
"windows-sys 0.52.0",
]
+[[package]]
+name = "num"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23"
+dependencies = [
+ "num-bigint",
+ "num-complex",
+ "num-integer",
+ "num-iter",
+ "num-rational",
+ "num-traits",
+]
+
[[package]]
name = "num-bigint"
version = "0.4.6"
@@ -5154,6 +5309,15 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "num-complex"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495"
+dependencies = [
+ "num-traits",
+]
+
[[package]]
name = "num-conv"
version = "0.1.0"
@@ -5195,6 +5359,17 @@ dependencies = [
"num-modular",
]
+[[package]]
+name = "num-rational"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824"
+dependencies = [
+ "num-bigint",
+ "num-integer",
+ "num-traits",
+]
+
[[package]]
name = "num-traits"
version = "0.2.19"
@@ -5529,6 +5704,31 @@ dependencies = [
"windows-targets 0.52.6",
]
+[[package]]
+name = "parse-display"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a"
+dependencies = [
+ "parse-display-derive",
+ "regex",
+ "regex-syntax 0.8.6",
+]
+
+[[package]]
+name = "parse-display-derive"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2ae7800a4c974efd12df917266338e79a7a74415173caf7e70aa0a0707345281"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "regex",
+ "regex-syntax 0.8.6",
+ "structmeta",
+ "syn 2.0.106",
+]
+
[[package]]
name = "passterm"
version = "2.0.1"
@@ -6318,6 +6518,15 @@ dependencies = [
"bitflags 1.3.2",
]
+[[package]]
+name = "redox_syscall"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
+dependencies = [
+ "bitflags 1.3.2",
+]
+
[[package]]
name = "redox_syscall"
version = "0.5.17"
@@ -7076,6 +7285,17 @@ dependencies = [
"serde_core",
]
+[[package]]
+name = "serde_repr"
+version = "0.1.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.106",
+]
+
[[package]]
name = "serde_spanned"
version = "0.6.9"
@@ -7596,7 +7816,7 @@ dependencies = [
"chrono",
"crc",
"dotenvy",
- "etcetera",
+ "etcetera 0.8.0",
"futures-channel",
"futures-core",
"futures-util",
@@ -7704,6 +7924,29 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
+[[package]]
+name = "structmeta"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "structmeta-derive",
+ "syn 2.0.106",
+]
+
+[[package]]
+name = "structmeta-derive"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.106",
+]
+
[[package]]
name = "strum"
version = "0.27.2"
@@ -7907,6 +8150,45 @@ dependencies = [
"test-case-core",
]
+[[package]]
+name = "testcontainers"
+version = "0.25.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b92bce247dc9260a19808321e11b51ea6a0293d02b48ab1c6578960610cfa2a7"
+dependencies = [
+ "async-trait",
+ "bollard",
+ "bollard-stubs",
+ "bytes",
+ "docker_credential",
+ "either",
+ "etcetera 0.10.0",
+ "futures",
+ "log",
+ "memchr",
+ "parse-display",
+ "pin-project-lite",
+ "serde",
+ "serde_json",
+ "serde_with",
+ "thiserror 2.0.16",
+ "tokio",
+ "tokio-stream",
+ "tokio-tar",
+ "tokio-util",
+ "ulid",
+ "url",
+]
+
+[[package]]
+name = "testcontainers-modules"
+version = "0.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1966329d5bb3f89d33602d2db2da971fb839f9297dad16527abf4564e2ae0a6d"
+dependencies = [
+ "testcontainers",
+]
+
[[package]]
name = "textwrap"
version = "0.16.2"
@@ -8086,6 +8368,21 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "tokio-tar"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75"
+dependencies = [
+ "filetime",
+ "futures-core",
+ "libc",
+ "redox_syscall 0.3.5",
+ "tokio",
+ "tokio-stream",
+ "xattr",
+]
+
[[package]]
name = "tokio-util"
version = "0.7.16"
@@ -8240,8 +8537,10 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [
"async-trait",
+ "axum 0.8.4",
"base64 0.22.1",
"bytes",
+ "h2 0.4.12",
"http 1.3.1",
"http-body",
"http-body-util",
@@ -8251,6 +8550,7 @@ dependencies = [
"percent-encoding",
"pin-project",
"prost 0.13.5",
+ "socket2 0.5.10",
"tokio",
"tokio-stream",
"tower 0.5.2",
@@ -8602,6 +8902,21 @@ version = "0.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae"
+[[package]]
+name = "ureq"
+version = "2.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d"
+dependencies = [
+ "base64 0.22.1",
+ "log",
+ "once_cell",
+ "rustls",
+ "rustls-pki-types",
+ "url",
+ "webpki-roots 0.26.11",
+]
+
[[package]]
name = "url"
version = "2.5.7"
@@ -9521,6 +9836,16 @@ dependencies = [
"tap",
]
+[[package]]
+name = "xattr"
+version = "1.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "af3a19837351dc82ba89f8a125e22a3c475f05aba604acc023d62b2739ae2909"
+dependencies = [
+ "libc",
+ "rustix",
+]
+
[[package]]
name = "yaml-rust2"
version = "0.10.4"
diff --git a/core/ai/mcp/src/error.rs b/core/ai/mcp/src/error.rs
index 0344cfac..0a8c0363 100644
--- a/core/ai/mcp/src/error.rs
+++ b/core/ai/mcp/src/error.rs
@@ -36,4 +36,6 @@ pub enum McpRuntimeError {
FailedToCreateConsumerId,
#[error("Invalid API path")]
InvalidApiPath,
+ #[error("Configuration not found: {0}")]
+ ConfigurationNotFound(String),
}
diff --git a/core/ai/mcp/src/main.rs b/core/ai/mcp/src/main.rs
index 1337ce0b..b118720d 100644
--- a/core/ai/mcp/src/main.rs
+++ b/core/ai/mcp/src/main.rs
@@ -51,7 +51,22 @@ async fn main() -> Result<(), McpRuntimeError> {
);
}
- let config_path = env::var("IGGY_MCP_CONFIG_PATH").unwrap_or_else(|_|
"config".to_string());
+ let config_path = env::var("IGGY_MCP_CONFIG_PATH");
+ if let Ok(ref path) = config_path {
+ let config_with_extension = if path.contains('.') {
+ path.to_owned()
+ } else {
+ format!("{path}.toml")
+ };
+ eprintln!("Checking if config path exists:
{config_with_extension}...");
+ if !std::fs::exists(&config_with_extension).unwrap_or_default() {
+ return Err(McpRuntimeError::ConfigurationNotFound(
+ config_with_extension,
+ ));
+ }
+ }
+
+ let config_path = config_path.unwrap_or_else(|_| "config".to_string());
eprintln!("Configuration file path: {config_path}");
let config: McpServerConfig = Config::builder()
.add_source(Config::try_from(&McpServerConfig::default()).expect("Failed to
init config"))
diff --git a/core/connectors/runtime/README.md
b/core/connectors/runtime/README.md
index bee66d27..34019292 100644
--- a/core/connectors/runtime/README.md
+++ b/core/connectors/runtime/README.md
@@ -33,15 +33,15 @@ Keep in mind that either of `toml`, `yaml`, or `json`
formats are supported for
## HTTP API
-Connector runtime has an optional HTTP API that can be enabled by setting the
`enabled` flag to `true` in the `[http_api]` section.
+Connector runtime has an optional HTTP API that can be enabled by setting the
`enabled` flag to `true` in the `[http]` section.
```toml
-[http_api] # Optional HTTP API configuration
+[http] # Optional HTTP API configuration
enabled = true
address = "127.0.0.1:8081"
# api_key = "secret" # Optional API key for authentication to be passed as
`api-key` header
-[http_api.cors] # Optional CORS configuration for HTTP API
+[http.cors] # Optional CORS configuration for HTTP API
enabled = false
allowed_methods = ["GET", "POST", "PUT", "DELETE"]
allowed_origins = ["*"]
@@ -50,7 +50,7 @@ exposed_headers = [""]
allow_credentials = false
allow_private_network = false
-[http_api.tls] # Optional TLS configuration for HTTP API
+[http.tls] # Optional TLS configuration for HTTP API
enabled = false
cert_file = "core/certs/iggy_cert.pem"
key_file = "core/certs/iggy_key.pem"
diff --git a/core/connectors/runtime/src/error.rs
b/core/connectors/runtime/src/error.rs
index c622feab..f8674df8 100644
--- a/core/connectors/runtime/src/error.rs
+++ b/core/connectors/runtime/src/error.rs
@@ -22,6 +22,8 @@ use thiserror::Error;
pub enum RuntimeError {
#[error("Invalid configuration: {0}")]
InvalidConfiguration(String),
+ #[error("Configuration not found: {0}")]
+ ConfigurationNotFound(String),
#[error("Failed to serialize topic metadata")]
FailedToSerializeTopicMetadata,
#[error("Failed to serialize messages metadata")]
diff --git a/core/connectors/runtime/src/main.rs
b/core/connectors/runtime/src/main.rs
index 7b5d88a5..4ca3369e 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -108,9 +108,22 @@ async fn main() -> Result<(), RuntimeError> {
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
.init();
- let config_path =
- env::var("IGGY_CONNECTORS_CONFIG_PATH").unwrap_or_else(|_|
"config".to_string());
+ let config_path = env::var("IGGY_CONNECTORS_CONFIG_PATH");
+ if let Ok(ref path) = config_path {
+ let config_with_extension = if path.contains('.') {
+ path.to_owned()
+ } else {
+ format!("{path}.toml")
+ };
+ info!("Checking if config path exists: {config_with_extension}...");
+ if !std::fs::exists(&config_with_extension).unwrap_or_default() {
+ return
Err(RuntimeError::ConfigurationNotFound(config_with_extension));
+ }
+ }
+
+ let config_path = config_path.unwrap_or_else(|_| "config".to_string());
info!("Starting Iggy Connectors Runtime, loading configuration from:
{config_path}...");
+
let config: RuntimeConfig = Config::builder()
.add_source(Config::try_from(&RuntimeConfig::default()).expect("Failed
to init config"))
.add_source(File::with_name(&config_path).required(false))
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 61be6918..5e86a564 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -58,6 +58,7 @@ serial_test = { workspace = true }
server = { workspace = true }
tempfile = { workspace = true }
test-case = { workspace = true }
+testcontainers-modules = { version = "0.13.0", features = ["postgres"] }
tokio = { workspace = true }
twox-hash = { workspace = true }
uuid = { workspace = true }
diff --git a/core/integration/src/test_connectors_runtime.rs
b/core/integration/src/test_connectors_runtime.rs
index bf175eca..d28f67a7 100644
--- a/core/integration/src/test_connectors_runtime.rs
+++ b/core/integration/src/test_connectors_runtime.rs
@@ -40,8 +40,11 @@ pub struct TestConnectorsRuntime {
}
impl TestConnectorsRuntime {
- pub fn with_iggy_address(iggy_tcp_server_address: &str) -> Self {
- Self::new(iggy_tcp_server_address, None, None)
+ pub fn with_iggy_address(
+ iggy_tcp_server_address: &str,
+ envs: Option<HashMap<String, String>>,
+ ) -> Self {
+ Self::new(iggy_tcp_server_address, envs, None)
}
pub fn new(
diff --git a/core/integration/tests/connectors/mod.rs
b/core/integration/tests/connectors/mod.rs
index 2689cdae..789ca3ed 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -16,40 +16,98 @@
* under the License.
*/
+use iggy::prelude::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, IggyClient};
+use iggy_binary_protocol::UserClient;
use integration::{
+ tcp_client::TcpClientFactory,
test_connectors_runtime::TestConnectorsRuntime,
- test_server::{IpAddrKind, TestServer},
+ test_server::{ClientFactory, IpAddrKind, TestServer},
};
use serial_test::parallel;
use std::collections::HashMap;
+mod sinks;
+mod sources;
+
#[tokio::test]
#[parallel]
async fn connectors_runtime_should_start() {
- setup().await;
+ let mut infra = setup();
+ infra.start_connectors_runtime(None, None).await;
}
-async fn setup() -> ConnectorsInfra {
+fn setup() -> ConnectorsInfra {
let mut iggy_envs = HashMap::new();
iggy_envs.insert("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned());
let mut test_server = TestServer::new(Some(iggy_envs), true, None,
IpAddrKind::V4);
test_server.start();
- let iggy_server_address = test_server
- .get_raw_tcp_addr()
- .expect("Failed to get Iggy TCP address");
-
- let mut connectors_runtime =
TestConnectorsRuntime::with_iggy_address(&iggy_server_address);
- connectors_runtime.start();
- connectors_runtime.ensure_started().await;
-
ConnectorsInfra {
- _iggy_server: test_server,
- _connectors_runtime: connectors_runtime,
+ iggy_server: test_server,
+ connectors_runtim: None,
}
}
#[derive(Debug)]
struct ConnectorsInfra {
- _iggy_server: TestServer,
- _connectors_runtime: TestConnectorsRuntime,
+ iggy_server: TestServer,
+ connectors_runtim: Option<TestConnectorsRuntime>,
+}
+
+impl ConnectorsInfra {
+ pub async fn start_connectors_runtime(
+ &mut self,
+ config_path: Option<&str>,
+ envs: Option<HashMap<String, String>>,
+ ) {
+ if self.connectors_runtim.is_some() {
+ return;
+ }
+
+ let mut all_envs = None;
+ if let Some(config_path) = config_path {
+ let config_path = format!("tests/connectors/{config_path}");
+ let mut map = HashMap::new();
+ map.insert(
+ "IGGY_CONNECTORS_CONFIG_PATH".to_owned(),
+ config_path.to_owned(),
+ );
+ all_envs = Some(map);
+ }
+
+ if let Some(envs) = envs {
+ for (k, v) in envs {
+ all_envs
+ .get_or_insert_with(HashMap::new)
+ .insert(k.to_owned(), v.to_owned());
+ }
+ }
+
+ let iggy_server_address = self
+ .iggy_server
+ .get_raw_tcp_addr()
+ .expect("Failed to get Iggy TCP address");
+ let mut connectors_runtime =
+ TestConnectorsRuntime::with_iggy_address(&iggy_server_address,
all_envs);
+ connectors_runtime.start();
+ connectors_runtime.ensure_started().await;
+ self.connectors_runtim = Some(connectors_runtime);
+ }
+
+ pub async fn create_client(&self) -> IggyClient {
+ let server_addr = self
+ .iggy_server
+ .get_raw_tcp_addr()
+ .expect("Failed to get Iggy TCP address");
+ let client = TcpClientFactory {
+ server_addr,
+ ..Default::default()
+ }
+ .create_client()
+ .await;
+ client
+ .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+ .await
+ .expect("Failed to login as root user");
+ IggyClient::create(client, None, None)
+ }
}
diff --git a/core/ai/mcp/src/error.rs
b/core/integration/tests/connectors/sinks/mod.rs
similarity index 55%
copy from core/ai/mcp/src/error.rs
copy to core/integration/tests/connectors/sinks/mod.rs
index 0344cfac..9c64d589 100644
--- a/core/ai/mcp/src/error.rs
+++ b/core/integration/tests/connectors/sinks/mod.rs
@@ -16,24 +16,4 @@
* under the License.
*/
-use thiserror::Error;
-
-#[derive(Debug, Error)]
-pub enum McpRuntimeError {
- #[error("Failed to create service")]
- FailedToCreateService,
- #[error("Missing configuration")]
- MissingConfig,
- #[error("Failed to start HTTP server")]
- FailedToStartHttpServer,
- #[error("Iggy client error")]
- IggyClient(#[from] iggy::prelude::ClientError),
- #[error("Iggy error")]
- IggyError(#[from] iggy::prelude::IggyError),
- #[error("Missing Iggy credentials")]
- MissingIggyCredentials,
- #[error("Failed to create Iggy consumer ID")]
- FailedToCreateConsumerId,
- #[error("Invalid API path")]
- InvalidApiPath,
-}
+mod postgres;
diff --git a/core/integration/tests/connectors/sinks/postgres/mod.rs
b/core/integration/tests/connectors/sinks/postgres/mod.rs
new file mode 100644
index 00000000..8ab8fc8e
--- /dev/null
+++ b/core/integration/tests/connectors/sinks/postgres/mod.rs
@@ -0,0 +1,66 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::connectors::setup;
+use iggy_binary_protocol::{StreamClient, TopicClient};
+use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
+use std::collections::HashMap;
+use testcontainers_modules::{postgres, testcontainers::runners::AsyncRunner};
+
+#[tokio::test]
+async fn given_valid_configuration_postgres_sink_should_start() {
+ let container = postgres::Postgres::default()
+ .start()
+ .await
+ .expect("Failed to start Postgres");
+ let host_port = container
+ .get_host_port_ipv4(5432)
+ .await
+ .expect("Failed to get Postgres port");
+
+ let mut envs = HashMap::new();
+ envs.insert(
+ "IGGY_CONNECTORS_SINKS_POSTGRES_CONFIG_CONNECTION_STRING".to_owned(),
+ format!("postgres://postgres:postgres@localhost:{host_port}"),
+ );
+ let mut infra = setup();
+ let client = infra.create_client().await;
+ let stream_name = "test";
+ let topic_name = "test";
+ client
+ .create_stream("test", None)
+ .await
+ .expect("Failed to create stream");
+ let stream_id: Identifier = stream_name.try_into().expect("Invalid stream
name");
+ client
+ .create_topic(
+ &stream_id,
+ topic_name,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ None,
+ IggyExpiry::ServerDefault,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .expect("Failed to create topic");
+ infra
+ .start_connectors_runtime(Some("sinks/postgres/postgres.toml"),
Some(envs))
+ .await;
+}
diff --git a/core/integration/tests/connectors/sinks/postgres/postgres.toml
b/core/integration/tests/connectors/sinks/postgres/postgres.toml
new file mode 100644
index 00000000..3dc2b1de
--- /dev/null
+++ b/core/integration/tests/connectors/sinks/postgres/postgres.toml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[sinks.postgres]
+enabled = true
+name = "Postgres sink"
+path = "../../target/debug/libiggy_connector_postgres_sink"
+
+[[sinks.postgres.streams]]
+stream = "test"
+topics = ["test"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "test"
+
+[sinks.postgres.config]
+connection_string = ""
+target_table = "iggy_messages"
+batch_size = 100
+max_connections = 10
+auto_create_table = true
+include_metadata = true
+include_checksum = true
+include_origin_timestamp = true
diff --git a/core/ai/mcp/src/error.rs
b/core/integration/tests/connectors/sources/mod.rs
similarity index 55%
copy from core/ai/mcp/src/error.rs
copy to core/integration/tests/connectors/sources/mod.rs
index 0344cfac..9c64d589 100644
--- a/core/ai/mcp/src/error.rs
+++ b/core/integration/tests/connectors/sources/mod.rs
@@ -16,24 +16,4 @@
* under the License.
*/
-use thiserror::Error;
-
-#[derive(Debug, Error)]
-pub enum McpRuntimeError {
- #[error("Failed to create service")]
- FailedToCreateService,
- #[error("Missing configuration")]
- MissingConfig,
- #[error("Failed to start HTTP server")]
- FailedToStartHttpServer,
- #[error("Iggy client error")]
- IggyClient(#[from] iggy::prelude::ClientError),
- #[error("Iggy error")]
- IggyError(#[from] iggy::prelude::IggyError),
- #[error("Missing Iggy credentials")]
- MissingIggyCredentials,
- #[error("Failed to create Iggy consumer ID")]
- FailedToCreateConsumerId,
- #[error("Invalid API path")]
- InvalidApiPath,
-}
+mod postgres;
diff --git a/core/ai/mcp/src/error.rs
b/core/integration/tests/connectors/sources/postgres/mod.rs
similarity index 55%
copy from core/ai/mcp/src/error.rs
copy to core/integration/tests/connectors/sources/postgres/mod.rs
index 0344cfac..31bd66e6 100644
--- a/core/ai/mcp/src/error.rs
+++ b/core/integration/tests/connectors/sources/postgres/mod.rs
@@ -15,25 +15,3 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-use thiserror::Error;
-
-#[derive(Debug, Error)]
-pub enum McpRuntimeError {
- #[error("Failed to create service")]
- FailedToCreateService,
- #[error("Missing configuration")]
- MissingConfig,
- #[error("Failed to start HTTP server")]
- FailedToStartHttpServer,
- #[error("Iggy client error")]
- IggyClient(#[from] iggy::prelude::ClientError),
- #[error("Iggy error")]
- IggyError(#[from] iggy::prelude::IggyError),
- #[error("Missing Iggy credentials")]
- MissingIggyCredentials,
- #[error("Failed to create Iggy consumer ID")]
- FailedToCreateConsumerId,
- #[error("Invalid API path")]
- InvalidApiPath,
-}