This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new d64941ead fix(connectors): improve integration tests (#2382)
d64941ead is described below
commit d64941ead4c5d5559ba0ada7fd6651c872a63247
Author: Piotr Gankiewicz <[email protected]>
AuthorDate: Wed Nov 19 22:25:16 2025 +0100
fix(connectors): improve integration tests (#2382)
---
README.md | 25 ++++++++----
core/connectors/runtime/src/main.rs | 5 +++
core/integration/tests/connectors/mod.rs | 47 ++++++++++++++++++++--
.../tests/connectors/postgres/config.toml | 2 +-
.../postgres/connectors_config/postgres.toml | 4 +-
core/integration/tests/connectors/postgres/mod.rs | 7 ++--
.../tests/connectors/postgres/postgres_sink.rs | 4 +-
.../connectors/{postgres => random}/config.toml | 2 +-
.../connectors_config/random.toml} | 37 +++++++++--------
.../{postgres/postgres_sink.rs => random/mod.rs} | 15 ++++---
.../postgres_sink.rs => random/random_source.rs} | 23 +++++++++--
helm/charts/iggy/Chart.yaml | 4 +-
12 files changed, 125 insertions(+), 50 deletions(-)
diff --git a/README.md b/README.md
index e24993784..c30747810 100644
--- a/README.md
+++ b/README.md
@@ -137,14 +137,16 @@ There's a dedicated Web UI for the server, which allows
managing the streams, to
The highly performant and modular
**[runtime](https://github.com/apache/iggy/tree/master/core/connectors)** for
statically typed, yet dynamically loaded connectors. Ingest the data from the
external sources and push it further to the Iggy streams, or fetch the data
from the Iggy streams and push it further to the external sources. **Create
your own Rust plugins** by simply implementing either the `Source` or `Sink`
trait and **build custom pipelines for the data processing**.
```toml
-## Configure a sink or source connector, depending on your needs
-[sinks.quickwit]
+## Configure a sink or source connector, depending on your needs in its own
config file.
+type = "sink"
+key = "quickwit"
enabled = true
+version = 0
name = "Quickwit sink"
path = "target/release/libiggy_connector_quickwit_sink"
-config_format = "yaml"
+plugin_config_format = "yaml"
-[[sinks.quickwit.streams]]
+[[streams]]
stream = "qw"
topics = ["records"]
schema = "json"
@@ -152,11 +154,18 @@ batch_length = 1000
poll_interval = "5ms"
consumer_group = "qw_sink_connector"
-[[sinks.quickwit.transforms.add_fields.fields]]
-key = "random_id"
-value.computed = "uuid_v7"
+[transforms.add_fields]
+enabled = true
+
+[[transforms.add_fields.fields]]
+key = "service_name"
+value.static = "qw_connector"
+
+[[transforms.add_fields.fields]]
+key = "timestamp"
+value.computed = "timestamp_millis"
-[sinks.quickwit.transforms.delete_fields]
+[transforms.delete_fields]
enabled = true
fields = ["email", "created_at"]
```
diff --git a/core/connectors/runtime/src/main.rs
b/core/connectors/runtime/src/main.rs
index a266921b1..8da7add28 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -130,6 +130,11 @@ async fn main() -> Result<(), RuntimeError> {
create_connectors_config_provider(&config.connectors).await?;
let connectors_config =
connectors_config_provider.get_active_configs().await?;
+ info!(
+ "Found {} source and {} sink configurations.",
+ connectors_config.sources().len(),
+ connectors_config.sinks().len()
+ );
let sources_config = connectors_config.sources();
let sources = source::init(
sources_config.clone(),
diff --git a/core/integration/tests/connectors/mod.rs
b/core/integration/tests/connectors/mod.rs
index 23499a2ac..d90b2902c 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -17,8 +17,8 @@
*/
use iggy::prelude::{Client, DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME,
IggyClient};
-use iggy_binary_protocol::{StreamClient, TopicClient, UserClient};
-use iggy_common::{CompressionAlgorithm, IggyExpiry, MaxTopicSize};
+use iggy_binary_protocol::{MessageClient, StreamClient, TopicClient,
UserClient};
+use iggy_common::{CompressionAlgorithm, IggyExpiry, MaxTopicSize,
PolledMessages};
use integration::{
tcp_client::TcpClientFactory,
test_connectors_runtime::TestConnectorsRuntime,
@@ -27,6 +27,7 @@ use integration::{
use std::collections::HashMap;
mod postgres;
+mod random;
const DEFAULT_TEST_STREAM: &str = "test_stream";
const DEFAULT_TEST_TOPIC: &str = "test_topic";
@@ -34,20 +35,48 @@ const DEFAULT_TEST_TOPIC: &str = "test_topic";
fn setup_runtime() -> ConnectorsRuntime {
let mut iggy_envs = HashMap::new();
iggy_envs.insert("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned());
+ iggy_envs.insert("IGGY_WEBSOCKET_ENABLED".to_owned(), "false".to_owned());
let mut test_server = TestServer::new(Some(iggy_envs), true, None,
IpAddrKind::V4);
test_server.start();
ConnectorsRuntime {
iggy_server: test_server,
connectors_runtime: None,
+ stream: "".to_owned(),
+ topic: "".to_owned(),
}
}
#[derive(Debug)]
struct ConnectorsRuntime {
+ stream: String,
+ topic: String,
iggy_server: TestServer,
connectors_runtime: Option<TestConnectorsRuntime>,
}
+#[derive(Debug)]
+struct ConnectorsIggyClient {
+ stream: String,
+ topic: String,
+ client: IggyClient,
+}
+
+impl ConnectorsIggyClient {
+ async fn get_messages(&self) -> Result<PolledMessages,
iggy_common::IggyError> {
+ self.client
+ .poll_messages(
+ &self.stream.clone().try_into().unwrap(),
+ &self.topic.clone().try_into().unwrap(),
+ None,
+
&iggy_common::Consumer::new("test_consumer".try_into().unwrap()),
+ &iggy_common::PollingStrategy::next(),
+ 10,
+ true,
+ )
+ .await
+ }
+}
+
#[derive(Debug)]
pub struct IggySetup {
pub stream: String,
@@ -87,7 +116,7 @@ impl ConnectorsRuntime {
}
}
- let client = self.create_client().await;
+ let client = self.create_iggy_client().await;
client
.create_stream(&iggy_setup.stream)
.await
@@ -118,10 +147,20 @@ impl ConnectorsRuntime {
TestConnectorsRuntime::with_iggy_address(&iggy_server_address,
Some(all_envs));
connectors_runtime.start();
connectors_runtime.ensure_started().await;
+ self.stream = self.stream.clone();
+ self.topic = self.topic.clone();
self.connectors_runtime = Some(connectors_runtime);
}
- async fn create_client(&self) -> IggyClient {
+ pub async fn create_client(&self) -> ConnectorsIggyClient {
+ ConnectorsIggyClient {
+ stream: DEFAULT_TEST_STREAM.to_owned(),
+ topic: DEFAULT_TEST_TOPIC.to_owned(),
+ client: self.create_iggy_client().await,
+ }
+ }
+
+ async fn create_iggy_client(&self) -> IggyClient {
let server_addr = self
.iggy_server
.get_raw_tcp_addr()
diff --git a/core/integration/tests/connectors/postgres/config.toml
b/core/integration/tests/connectors/postgres/config.toml
index fff72e37e..7336b47b1 100644
--- a/core/integration/tests/connectors/postgres/config.toml
+++ b/core/integration/tests/connectors/postgres/config.toml
@@ -17,4 +17,4 @@
[connectors]
config_type = "local"
-config_dir = "postgres/connectors_config"
+config_dir = "tests/connectors/postgres/connectors_config"
diff --git
a/core/integration/tests/connectors/postgres/connectors_config/postgres.toml
b/core/integration/tests/connectors/postgres/connectors_config/postgres.toml
index 7602cd596..7476a2126 100644
--- a/core/integration/tests/connectors/postgres/connectors_config/postgres.toml
+++ b/core/integration/tests/connectors/postgres/connectors_config/postgres.toml
@@ -23,8 +23,8 @@ name = "Postgres sink"
path = "../../target/debug/libiggy_connector_postgres_sink"
[[streams]]
-stream = ""
-topics = [""]
+stream = "test_stream"
+topics = ["test_topic"]
schema = "json"
batch_length = 100
poll_interval = "5ms"
diff --git a/core/integration/tests/connectors/postgres/mod.rs
b/core/integration/tests/connectors/postgres/mod.rs
index 75acc741e..c18900102 100644
--- a/core/integration/tests/connectors/postgres/mod.rs
+++ b/core/integration/tests/connectors/postgres/mod.rs
@@ -17,14 +17,13 @@
* under the License.
*/
-use crate::connectors::{IggySetup, setup_runtime};
-use iggy::prelude::IggyClient;
+use crate::connectors::{ConnectorsRuntime, IggySetup, setup_runtime};
use std::collections::HashMap;
use testcontainers_modules::{postgres, testcontainers::runners::AsyncRunner};
mod postgres_sink;
-async fn setup() -> IggyClient {
+async fn setup() -> ConnectorsRuntime {
let container = postgres::Postgres::default()
.start()
.await
@@ -57,5 +56,5 @@ async fn setup() -> IggyClient {
runtime
.init("postgres/config.toml", Some(envs), iggy_setup)
.await;
- runtime.create_client().await
+ runtime
}
diff --git a/core/integration/tests/connectors/postgres/postgres_sink.rs
b/core/integration/tests/connectors/postgres/postgres_sink.rs
index 5c697a938..ee05048b2 100644
--- a/core/integration/tests/connectors/postgres/postgres_sink.rs
+++ b/core/integration/tests/connectors/postgres/postgres_sink.rs
@@ -19,6 +19,6 @@
use crate::connectors::postgres::setup;
#[tokio::test]
-async fn given_valid_configuration_postgres_sink_should_start() {
- let _client = setup().await;
+async fn given_valid_configuration_postgres_sink_connector_should_start() {
+ let _runtime = setup().await;
}
diff --git a/core/integration/tests/connectors/postgres/config.toml
b/core/integration/tests/connectors/random/config.toml
similarity index 93%
copy from core/integration/tests/connectors/postgres/config.toml
copy to core/integration/tests/connectors/random/config.toml
index fff72e37e..544b2bd63 100644
--- a/core/integration/tests/connectors/postgres/config.toml
+++ b/core/integration/tests/connectors/random/config.toml
@@ -17,4 +17,4 @@
[connectors]
config_type = "local"
-config_dir = "postgres/connectors_config"
+config_dir = "tests/connectors/random/connectors_config"
diff --git
a/core/integration/tests/connectors/postgres/connectors_config/postgres.toml
b/core/integration/tests/connectors/random/connectors_config/random.toml
similarity index 66%
copy from
core/integration/tests/connectors/postgres/connectors_config/postgres.toml
copy to core/integration/tests/connectors/random/connectors_config/random.toml
index 7602cd596..fb219ab7e 100644
--- a/core/integration/tests/connectors/postgres/connectors_config/postgres.toml
+++ b/core/integration/tests/connectors/random/connectors_config/random.toml
@@ -15,27 +15,30 @@
# specific language governing permissions and limitations
# under the License.
-type = "sink"
-key = "postgres"
+type = "source"
+key = "random"
enabled = true
version = 0
-name = "Postgres sink"
-path = "../../target/debug/libiggy_connector_postgres_sink"
+name = "Random source"
+path = "../../target/debug/libiggy_connector_random_source"
+plugin_config_format = "json"
[[streams]]
-stream = ""
-topics = [""]
+stream = "test_stream"
+topic = "test_topic"
schema = "json"
-batch_length = 100
-poll_interval = "5ms"
-consumer_group = "test"
+batch_length = 1000
+linger_time = "5ms"
[plugin_config]
-connection_string = ""
-target_table = "iggy_messages"
-batch_size = 100
-max_connections = 10
-auto_create_table = true
-include_metadata = true
-include_checksum = true
-include_origin_timestamp = true
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200
+
+[transforms.add_fields]
+enabled = true
+
+[[transforms.add_fields.fields]]
+key = "test_field"
+value.static = "hello!"
diff --git a/core/integration/tests/connectors/postgres/postgres_sink.rs
b/core/integration/tests/connectors/random/mod.rs
similarity index 67%
copy from core/integration/tests/connectors/postgres/postgres_sink.rs
copy to core/integration/tests/connectors/random/mod.rs
index 5c697a938..627ed1285 100644
--- a/core/integration/tests/connectors/postgres/postgres_sink.rs
+++ b/core/integration/tests/connectors/random/mod.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -16,9 +17,13 @@
* under the License.
*/
-use crate::connectors::postgres::setup;
+use crate::connectors::{ConnectorsRuntime, IggySetup, setup_runtime};
-#[tokio::test]
-async fn given_valid_configuration_postgres_sink_should_start() {
- let _client = setup().await;
+mod random_source;
+
+async fn setup() -> ConnectorsRuntime {
+ let iggy_setup = IggySetup::default();
+ let mut runtime = setup_runtime();
+ runtime.init("random/config.toml", None, iggy_setup).await;
+ runtime
}
diff --git a/core/integration/tests/connectors/postgres/postgres_sink.rs
b/core/integration/tests/connectors/random/random_source.rs
similarity index 52%
copy from core/integration/tests/connectors/postgres/postgres_sink.rs
copy to core/integration/tests/connectors/random/random_source.rs
index 5c697a938..b7d6eeccf 100644
--- a/core/integration/tests/connectors/postgres/postgres_sink.rs
+++ b/core/integration/tests/connectors/random/random_source.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -16,9 +17,23 @@
* under the License.
*/
-use crate::connectors::postgres::setup;
+use crate::connectors::random::setup;
+use std::time::Duration;
+use tokio::time::sleep;
#[tokio::test]
-async fn given_valid_configuration_postgres_sink_should_start() {
- let _client = setup().await;
+async fn
given_valid_configuration_random_source_connector_should_produce_messages() {
+ let runtime = setup().await;
+ let client = runtime.create_client().await;
+ // Wait for some messages to be produced
+ sleep(Duration::from_secs(1)).await;
+ let messages = client.get_messages().await.expect("Failed to get
messages");
+ assert!(
+ !messages.messages.is_empty(),
+ "No messages received from random source"
+ );
+ assert!(
+ messages.current_offset > 0,
+ "Current offset should be greater than 0"
+ );
}
diff --git a/helm/charts/iggy/Chart.yaml b/helm/charts/iggy/Chart.yaml
index 0aa40e8d1..e70b7b953 100644
--- a/helm/charts/iggy/Chart.yaml
+++ b/helm/charts/iggy/Chart.yaml
@@ -20,8 +20,8 @@ apiVersion: v2
name: iggy
description: A Helm chart for Apache Iggy server and web-ui
type: application
-version: 0.2.0
-appVersion: "0.5.0"
+version: 0.3.0
+appVersion: "0.6.0"
sources:
- https://github.com/apache/iggy
keywords: