This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new d850ab6ff test(connectors): add integration tests for Quickwit sink
(#2636)
d850ab6ff is described below
commit d850ab6ffd9c062eaaad6d1675ea3d7e7005e147
Author: JoshuaXOng <[email protected]>
AuthorDate: Sat Jan 31 03:33:28 2026 +1100
test(connectors): add integration tests for Quickwit sink (#2636)
---
Cargo.lock | 4 +
core/integration/Cargo.toml | 4 +
core/integration/tests/connectors/mod.rs | 1 +
.../tests/connectors/quickwit/config.toml | 20 ++
core/integration/tests/connectors/quickwit/mod.rs | 285 +++++++++++++++++++++
.../tests/connectors/quickwit/quickwit_sink.rs | 153 +++++++++++
.../tests/connectors/quickwit/sinks/quickwit.toml | 36 +++
7 files changed, 503 insertions(+)
diff --git a/Cargo.lock b/Cargo.lock
index fc1ce8c7f..dfe41e936 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4988,6 +4988,7 @@ name = "integration"
version = "0.0.1"
dependencies = [
"ahash 0.8.12",
+ "anyhow",
"assert_cmd",
"async-trait",
"bon",
@@ -5014,6 +5015,8 @@ dependencies = [
"rand 0.9.2",
"rcgen",
"reqwest",
+ "reqwest-middleware",
+ "reqwest-retry",
"rmcp",
"serde",
"serde_json",
@@ -5027,6 +5030,7 @@ dependencies = [
"test-case",
"testcontainers-modules",
"tokio",
+ "tracing",
"twox-hash",
"uuid",
"zip",
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 77b4a7d37..e0eef83e6 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -28,6 +28,7 @@ ci-qemu = []
[dependencies]
ahash = { workspace = true }
+anyhow = { workspace = true }
assert_cmd = "2.1.2"
async-trait = { workspace = true }
bon = { workspace = true }
@@ -54,6 +55,8 @@ predicates = { workspace = true }
rand = { workspace = true }
rcgen = "0.14.7"
reqwest = { workspace = true }
+reqwest-middleware = { workspace = true }
+reqwest-retry = { workspace = true }
rmcp = { version = "0.14.0", features = [
"client",
"reqwest",
@@ -72,6 +75,7 @@ tempfile = { workspace = true }
test-case = { workspace = true }
testcontainers-modules = { version = "0.14.0", features = ["postgres"] }
tokio = { workspace = true }
+tracing = { workspace = true }
twox-hash = { workspace = true }
uuid = { workspace = true }
zip = { workspace = true }
diff --git a/core/integration/tests/connectors/mod.rs
b/core/integration/tests/connectors/mod.rs
index 28ca40809..621b36d4d 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -36,6 +36,7 @@ use std::collections::HashMap;
mod api;
mod http_config_provider;
mod postgres;
+mod quickwit;
mod random;
pub const DEFAULT_TEST_STREAM: &str = "test_stream";
diff --git a/core/integration/tests/connectors/quickwit/config.toml
b/core/integration/tests/connectors/quickwit/config.toml
new file mode 100644
index 000000000..7a2495592
--- /dev/null
+++ b/core/integration/tests/connectors/quickwit/config.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/quickwit/sinks"
diff --git a/core/integration/tests/connectors/quickwit/mod.rs
b/core/integration/tests/connectors/quickwit/mod.rs
new file mode 100644
index 000000000..cfc1f0696
--- /dev/null
+++ b/core/integration/tests/connectors/quickwit/mod.rs
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+mod quickwit_sink;
+
+use anyhow::{Result, anyhow};
+use reqwest_middleware::ClientWithMiddleware as HttpClient;
+use reqwest_retry::RetryTransientMiddleware;
+use reqwest_retry::policies::ExponentialBackoff;
+use std::collections::HashMap;
+use tracing::info;
+use uuid::Uuid;
+
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage,
ImageExt};
+
+use crate::connectors::{
+ ConnectorsRuntime, DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, IggySetup,
setup_runtime,
+};
+
+mod quickwit_container {
+ pub static REPOSITORY: &str = "quickwit/quickwit";
+ pub static VERSION: &str = "0.8.1";
+ pub static LISTENING_IPV4: &str = "0.0.0.0";
+ pub static LISTENING_PORT: u16 = 7280;
+ pub static READY_MESSAGE: &str = "REST server is ready";
+}
+
+mod quickwit_paths {
+ pub static INDEXES: &str = "api/v1/indexes";
+
+ pub fn index_ingest(index_id: &str) -> String {
+ format!("api/v1/{index_id}/ingest")
+ }
+
+ pub fn index_search(index_id: &str) -> String {
+ format!("api/v1/{index_id}/search")
+ }
+}
+
+mod quickwit_responses {
+ use serde::Deserialize;
+
+ #[derive(Deserialize)]
+ pub struct IndexSearch {
+ pub hits: Vec<serde_json::Value>,
+ pub num_hits: usize,
+ #[allow(dead_code)]
+ pub elapsed_time_micros: usize,
+ }
+}
+
+struct QuickwitTestSetup {
+ quickwit: ContainerAsync<GenericImage>,
+ runtime: ConnectorsRuntime,
+ reqwest: HttpClient,
+}
+
+impl QuickwitTestSetup {
+ async fn try_new() -> Result<Self> {
+ let quickwit_container = start_quickwit_container().await;
+ let connectors_runtime =
start_iggy_with_quickwit_connector(&quickwit_container).await?;
+ Ok(Self {
+ quickwit: quickwit_container,
+ runtime: connectors_runtime,
+ reqwest: get_http_client_with_retries(),
+ })
+ }
+
+ async fn try_new_with_precreate_index(
+ quickwit_container: ContainerAsync<GenericImage>,
+ ) -> Result<Self> {
+ let http_client = get_http_client_with_retries();
+ create_quickwit_test_index(&quickwit_container, &http_client).await?;
+ let connectors_runtime =
start_iggy_with_quickwit_connector(&quickwit_container).await?;
+ Ok(Self {
+ quickwit: quickwit_container,
+ runtime: connectors_runtime,
+ reqwest: http_client,
+ })
+ }
+
+ async fn get_mapped_quickwit_port(&self) -> Result<u16> {
+ get_mapped_quickwit_port(&self.quickwit).await
+ }
+
+ async fn get_quickwit_test_index_all_search(&self) ->
Result<quickwit_responses::IndexSearch> {
+ let match_all = "";
+ let descending = "-";
+ let search_response = self
+ .reqwest
+ .get(format!(
+ "http://localhost:{}/{}",
+ self.get_mapped_quickwit_port().await?,
+ quickwit_paths::index_search(DEFAULT_TEST_TOPIC)
+ ))
+ .query(&[("query", match_all)])
+ .query(&[("sort_by",
format!(r#"{descending}{INDEX_TIMESTAMP_FIELD}"#))])
+ .send()
+ .await?;
+ info!("Received search index response.");
+ assert!(search_response.status().is_success());
+ search_response
+ .json::<quickwit_responses::IndexSearch>()
+ .await
+ .map_err(Into::into)
+ }
+
+ async fn flush_quickwit_test_index(&self) -> Result<()> {
+ let ingest_response = self
+ .reqwest
+ .post(format!(
+ "http://localhost:{}/{}",
+ self.get_mapped_quickwit_port().await?,
+ quickwit_paths::index_ingest(DEFAULT_TEST_TOPIC)
+ ))
+ .query(&[("commit", "force")])
+ .json("{}")
+ .send()
+ .await?;
+ info!("Received index ingest response.");
+ assert!(ingest_response.status().is_success());
+ Ok(())
+ }
+}
+
+fn get_http_client_with_retries() -> HttpClient {
+ let max_retries = 3;
+ let retry_policy =
ExponentialBackoff::builder().build_with_max_retries(max_retries);
+ reqwest_middleware::ClientBuilder::new(reqwest::Client::new())
+ .with(RetryTransientMiddleware::new_with_policy(retry_policy))
+ .build()
+}
+
+async fn get_mapped_quickwit_port(
+ quickwit_container: &ContainerAsync<GenericImage>,
+) -> Result<u16> {
+ let mapped_port = quickwit_container
+ .ports()
+ .await?
+ .map_to_host_port_ipv4(quickwit_container::LISTENING_PORT)
+ .ok_or(anyhow!("No mapping for Quickwit port."))?;
+ info!("Got ports details from container.");
+ Ok(mapped_port)
+}
+
+static INDEX_TIMESTAMP_FIELD: &str = "timestamp";
+
+fn get_quickwit_index_config() -> String {
+ format!(
+ r#"
+ version: 0.8
+ index_id: {DEFAULT_TEST_TOPIC}
+ doc_mapping:
+ mode: strict
+ field_mappings:
+ - name: id
+ type: u64
+ - name: name
+ type: text
+ tokenizer: raw
+ - name: count
+ type: u64
+ - name: amount
+ type: f64
+ - name: active
+ type: bool
+ - name: {INDEX_TIMESTAMP_FIELD}
+ type: datetime
+ input_formats:
+ - unix_timestamp
+ output_format: unix_timestamp_micros
+ fast_precision: microseconds
+ fast: true
+ timestamp_field: timestamp
+ retention:
+ period: 7 days
+ schedule: daily
+ "#
+ )
+}
+
+const STREAMS_KEY_PREFIX: &str = "IGGY_CONNECTORS_SINK_QUICKWIT_STREAMS";
+
+fn get_stream_0_overrides() -> HashMap<String, String> {
+ HashMap::from([
+ (
+ format!("{STREAMS_KEY_PREFIX}_0_STREAM"),
+ DEFAULT_TEST_STREAM.to_string(),
+ ),
+ (
+ format!("{STREAMS_KEY_PREFIX}_0_TOPICS"),
+ format!("[{DEFAULT_TEST_TOPIC}]"),
+ ),
+ (format!("{STREAMS_KEY_PREFIX}_0_SCHEMA"), "json".to_string()),
+ ])
+}
+
+async fn create_quickwit_test_index(
+ quickwit_container: &ContainerAsync<GenericImage>,
+ http_client: &HttpClient,
+) -> Result<()> {
+ let create_response = http_client
+ .post(format!(
+ "http://localhost:{}/{}",
+ get_mapped_quickwit_port(quickwit_container).await?,
+ quickwit_paths::INDEXES
+ ))
+ .header("Content-Type", "application/yaml")
+ .body(get_quickwit_index_config())
+ .send()
+ .await?;
+ info!("Received create index response.");
+ assert!(create_response.status().is_success());
+ Ok(())
+}
+
+static NETWORK_NAME_PREFIX: &str = "iggy-quickwit-sink";
+
+async fn start_quickwit_container() -> ContainerAsync<GenericImage> {
+ let unique_network = format!("{NETWORK_NAME_PREFIX}-{}", Uuid::new_v4());
+ let random_host = 0;
+ let quickwit_container =
+ GenericImage::new(quickwit_container::REPOSITORY,
quickwit_container::VERSION)
+ .with_exposed_port(random_host.tcp())
+ .with_wait_for(WaitFor::message_on_stdout(
+ quickwit_container::READY_MESSAGE,
+ ))
+ .with_network(unique_network)
+ .with_cmd(["run"])
+ .with_env_var("QW_LISTEN_ADDRESS",
quickwit_container::LISTENING_IPV4)
+ .with_mapped_port(random_host,
quickwit_container::LISTENING_PORT.tcp())
+ .start()
+ .await
+ .expect("Quickwit started.");
+ info!("Started quickwit container.");
+ quickwit_container
+}
+
+const PLUGIN_KEY_PREFIX: &str = "IGGY_CONNECTORS_SINK_QUICKWIT_PLUGIN_CONFIG";
+
+async fn start_iggy_with_quickwit_connector(
+ quickwit_container: &ContainerAsync<GenericImage>,
+) -> Result<ConnectorsRuntime> {
+ let mapped_port = get_mapped_quickwit_port(quickwit_container).await?;
+
+ let plugin_overrides = HashMap::from([
+ (
+ format!("{PLUGIN_KEY_PREFIX}_URL"),
+ format!("http://localhost:{mapped_port}"),
+ ),
+ (
+ format!("{PLUGIN_KEY_PREFIX}_INDEX"),
+ get_quickwit_index_config(),
+ ),
+ ]);
+ let mut extra_envs = HashMap::new();
+ extra_envs.extend(plugin_overrides);
+ extra_envs.extend(get_stream_0_overrides());
+
+ let mut connectors_runtime = setup_runtime();
+ let runtime_config = "quickwit/config.toml";
+ connectors_runtime
+ .init(runtime_config, Some(extra_envs), IggySetup::default())
+ .await;
+
+ Ok(connectors_runtime)
+}
diff --git a/core/integration/tests/connectors/quickwit/quickwit_sink.rs
b/core/integration/tests/connectors/quickwit/quickwit_sink.rs
new file mode 100644
index 000000000..5c8e02baa
--- /dev/null
+++ b/core/integration/tests/connectors/quickwit/quickwit_sink.rs
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use anyhow::Result;
+use bytes::Bytes;
+use iggy_common::IggyMessage;
+use serde::{Deserialize, Serialize};
+
+use crate::connectors::create_test_messages;
+use crate::connectors::quickwit::{QuickwitTestSetup, start_quickwit_container};
+
+async fn send_test_messages(
+ test_setup: &QuickwitTestSetup,
+ message_count: usize,
+) -> Result<Vec<Bytes>> {
+ let iggy_client = test_setup.runtime.create_client().await;
+
+ let message_payloads = create_test_messages(message_count)
+ .iter()
+ .map(|message| {
+ serde_json::to_vec(message)
+ .map(Bytes::from)
+ .map_err(Into::into)
+ })
+ .collect::<Result<Vec<Bytes>>>()?;
+
+ let mut test_messages = message_payloads
+ .iter()
+ .cloned()
+ .enumerate()
+ .map(|(i, message)| {
+ IggyMessage::builder()
+ .id((i + 1) as u128)
+ .payload(message)
+ .build()
+ .map_err(Into::into)
+ })
+ .collect::<Result<Vec<IggyMessage>>>()?;
+ iggy_client.send_messages(&mut test_messages).await;
+
+ Ok(message_payloads)
+}
+
+async fn assert_test_index_documents_match_message_payloads(
+ test_setup: &QuickwitTestSetup,
+ message_payloads: &[Bytes],
+) -> Result<()> {
+ test_setup.flush_quickwit_test_index().await?;
+
+ let search_response =
test_setup.get_quickwit_test_index_all_search().await?;
+ assert_eq!(search_response.num_hits, message_payloads.len());
+
+ for (quickwit_hit, message_payload) in search_response
+ .hits
+ .into_iter()
+ .zip(message_payloads.iter())
+ {
+ assert_eq!(
+ quickwit_hit,
+ serde_json::from_slice::<serde_json::Value>(message_payload)?
+ );
+ }
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn given_existent_quickwit_index_should_store() -> Result<()> {
+ let quickwit_container = start_quickwit_container().await;
+ let test_setup =
QuickwitTestSetup::try_new_with_precreate_index(quickwit_container).await?;
+
+ let message_count = 11;
+ let sent_payloads = send_test_messages(&test_setup, message_count).await?;
+
+ assert_test_index_documents_match_message_payloads(&test_setup,
&sent_payloads).await?;
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn given_nonexistent_quickwit_index_should_create_and_store() ->
Result<()> {
+ let test_setup = QuickwitTestSetup::try_new().await?;
+
+ let message_count = 13;
+ let sent_payloads = send_test_messages(&test_setup, message_count).await?;
+
+ assert_test_index_documents_match_message_payloads(&test_setup,
&sent_payloads).await?;
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn given_bulk_message_send_should_store() -> Result<()> {
+ let test_setup = QuickwitTestSetup::try_new().await?;
+
+ let message_count = 1000;
+ let sent_payloads = send_test_messages(&test_setup, message_count).await?;
+
+ assert_test_index_documents_match_message_payloads(&test_setup,
&sent_payloads).await?;
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn given_invalid_messages_should_not_store() -> Result<()> {
+ let test_setup = QuickwitTestSetup::try_new().await?;
+ let iggy_client = test_setup.runtime.create_client().await;
+
+ let first_valid =
Bytes::from(serde_json::to_vec(&create_test_messages(1)[0])?);
+ let second_valid =
Bytes::from(serde_json::to_vec(&create_test_messages(1)[0])?);
+
+ #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+ struct NotTestMessage {
+ not_a_test_message_field: f64,
+ }
+ let first_invalid = Bytes::from(serde_json::to_vec(&NotTestMessage {
+ not_a_test_message_field: 17.,
+ })?);
+
+ for (message_index, message_payload) in
+ [first_valid.clone(), first_invalid, second_valid.clone()]
+ .into_iter()
+ .enumerate()
+ {
+ iggy_client
+ .send_messages(&mut [IggyMessage::builder()
+ .id(message_index as u128 + 1)
+ .payload(message_payload)
+ .build()?])
+ .await;
+ }
+
+ assert_test_index_documents_match_message_payloads(&test_setup,
&[first_valid, second_valid])
+ .await?;
+
+ Ok(())
+}
diff --git a/core/integration/tests/connectors/quickwit/sinks/quickwit.toml
b/core/integration/tests/connectors/quickwit/sinks/quickwit.toml
new file mode 100644
index 000000000..fab316524
--- /dev/null
+++ b/core/integration/tests/connectors/quickwit/sinks/quickwit.toml
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+type = "sink"
+key = "quickwit"
+enabled = true
+version = 0
+name = "Quickwit sink"
+path = "../../target/debug/libiggy_connector_quickwit_sink"
+verbose = false
+
+[[streams]]
+stream = ""
+topics = []
+schema = ""
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "quickwit_sink"
+
+[plugin_config]
+url = ""
+index = ""