This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new d850ab6ff test(connectors): add integration tests for Quickwit sink 
(#2636)
d850ab6ff is described below

commit d850ab6ffd9c062eaaad6d1675ea3d7e7005e147
Author: JoshuaXOng <[email protected]>
AuthorDate: Sat Jan 31 03:33:28 2026 +1100

    test(connectors): add integration tests for Quickwit sink (#2636)
---
 Cargo.lock                                         |   4 +
 core/integration/Cargo.toml                        |   4 +
 core/integration/tests/connectors/mod.rs           |   1 +
 .../tests/connectors/quickwit/config.toml          |  20 ++
 core/integration/tests/connectors/quickwit/mod.rs  | 285 +++++++++++++++++++++
 .../tests/connectors/quickwit/quickwit_sink.rs     | 153 +++++++++++
 .../tests/connectors/quickwit/sinks/quickwit.toml  |  36 +++
 7 files changed, 503 insertions(+)

diff --git a/Cargo.lock b/Cargo.lock
index fc1ce8c7f..dfe41e936 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4988,6 +4988,7 @@ name = "integration"
 version = "0.0.1"
 dependencies = [
  "ahash 0.8.12",
+ "anyhow",
  "assert_cmd",
  "async-trait",
  "bon",
@@ -5014,6 +5015,8 @@ dependencies = [
  "rand 0.9.2",
  "rcgen",
  "reqwest",
+ "reqwest-middleware",
+ "reqwest-retry",
  "rmcp",
  "serde",
  "serde_json",
@@ -5027,6 +5030,7 @@ dependencies = [
  "test-case",
  "testcontainers-modules",
  "tokio",
+ "tracing",
  "twox-hash",
  "uuid",
  "zip",
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 77b4a7d37..e0eef83e6 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -28,6 +28,7 @@ ci-qemu = []
 
 [dependencies]
 ahash = { workspace = true }
+anyhow = { workspace = true }
 assert_cmd = "2.1.2"
 async-trait = { workspace = true }
 bon = { workspace = true }
@@ -54,6 +55,8 @@ predicates = { workspace = true }
 rand = { workspace = true }
 rcgen = "0.14.7"
 reqwest = { workspace = true }
+reqwest-middleware = { workspace = true }
+reqwest-retry = { workspace = true }
 rmcp = { version = "0.14.0", features = [
     "client",
     "reqwest",
@@ -72,6 +75,7 @@ tempfile = { workspace = true }
 test-case = { workspace = true }
 testcontainers-modules = { version = "0.14.0", features = ["postgres"] }
 tokio = { workspace = true }
+tracing = { workspace = true }
 twox-hash = { workspace = true }
 uuid = { workspace = true }
 zip = { workspace = true }
diff --git a/core/integration/tests/connectors/mod.rs 
b/core/integration/tests/connectors/mod.rs
index 28ca40809..621b36d4d 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -36,6 +36,7 @@ use std::collections::HashMap;
 mod api;
 mod http_config_provider;
 mod postgres;
+mod quickwit;
 mod random;
 
 pub const DEFAULT_TEST_STREAM: &str = "test_stream";
diff --git a/core/integration/tests/connectors/quickwit/config.toml 
b/core/integration/tests/connectors/quickwit/config.toml
new file mode 100644
index 000000000..7a2495592
--- /dev/null
+++ b/core/integration/tests/connectors/quickwit/config.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/quickwit/sinks"
diff --git a/core/integration/tests/connectors/quickwit/mod.rs 
b/core/integration/tests/connectors/quickwit/mod.rs
new file mode 100644
index 000000000..cfc1f0696
--- /dev/null
+++ b/core/integration/tests/connectors/quickwit/mod.rs
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+mod quickwit_sink;
+
+use anyhow::{Result, anyhow};
+use reqwest_middleware::ClientWithMiddleware as HttpClient;
+use reqwest_retry::RetryTransientMiddleware;
+use reqwest_retry::policies::ExponentialBackoff;
+use std::collections::HashMap;
+use tracing::info;
+use uuid::Uuid;
+
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, 
ImageExt};
+
+use crate::connectors::{
+    ConnectorsRuntime, DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, IggySetup, 
setup_runtime,
+};
+
+mod quickwit_container {
+    pub static REPOSITORY: &str = "quickwit/quickwit";
+    pub static VERSION: &str = "0.8.1";
+    pub static LISTENING_IPV4: &str = "0.0.0.0";
+    pub static LISTENING_PORT: u16 = 7280;
+    pub static READY_MESSAGE: &str = "REST server is ready";
+}
+
+mod quickwit_paths {
+    pub static INDEXES: &str = "api/v1/indexes";
+
+    pub fn index_ingest(index_id: &str) -> String {
+        format!("api/v1/{index_id}/ingest")
+    }
+
+    pub fn index_search(index_id: &str) -> String {
+        format!("api/v1/{index_id}/search")
+    }
+}
+
+mod quickwit_responses {
+    use serde::Deserialize;
+
+    #[derive(Deserialize)]
+    pub struct IndexSearch {
+        pub hits: Vec<serde_json::Value>,
+        pub num_hits: usize,
+        #[allow(dead_code)]
+        pub elapsed_time_micros: usize,
+    }
+}
+
+struct QuickwitTestSetup {
+    quickwit: ContainerAsync<GenericImage>,
+    runtime: ConnectorsRuntime,
+    reqwest: HttpClient,
+}
+
+impl QuickwitTestSetup {
+    async fn try_new() -> Result<Self> {
+        let quickwit_container = start_quickwit_container().await;
+        let connectors_runtime = 
start_iggy_with_quickwit_connector(&quickwit_container).await?;
+        Ok(Self {
+            quickwit: quickwit_container,
+            runtime: connectors_runtime,
+            reqwest: get_http_client_with_retries(),
+        })
+    }
+
+    async fn try_new_with_precreate_index(
+        quickwit_container: ContainerAsync<GenericImage>,
+    ) -> Result<Self> {
+        let http_client = get_http_client_with_retries();
+        create_quickwit_test_index(&quickwit_container, &http_client).await?;
+        let connectors_runtime = 
start_iggy_with_quickwit_connector(&quickwit_container).await?;
+        Ok(Self {
+            quickwit: quickwit_container,
+            runtime: connectors_runtime,
+            reqwest: http_client,
+        })
+    }
+
+    async fn get_mapped_quickwit_port(&self) -> Result<u16> {
+        get_mapped_quickwit_port(&self.quickwit).await
+    }
+
+    async fn get_quickwit_test_index_all_search(&self) -> 
Result<quickwit_responses::IndexSearch> {
+        let match_all = "";
+        let descending = "-";
+        let search_response = self
+            .reqwest
+            .get(format!(
+                "http://localhost:{}/{}";,
+                self.get_mapped_quickwit_port().await?,
+                quickwit_paths::index_search(DEFAULT_TEST_TOPIC)
+            ))
+            .query(&[("query", match_all)])
+            .query(&[("sort_by", 
format!(r#"{descending}{INDEX_TIMESTAMP_FIELD}"#))])
+            .send()
+            .await?;
+        info!("Received search index response.");
+        assert!(search_response.status().is_success());
+        search_response
+            .json::<quickwit_responses::IndexSearch>()
+            .await
+            .map_err(Into::into)
+    }
+
+    async fn flush_quickwit_test_index(&self) -> Result<()> {
+        let ingest_response = self
+            .reqwest
+            .post(format!(
+                "http://localhost:{}/{}";,
+                self.get_mapped_quickwit_port().await?,
+                quickwit_paths::index_ingest(DEFAULT_TEST_TOPIC)
+            ))
+            .query(&[("commit", "force")])
+            .json("{}")
+            .send()
+            .await?;
+        info!("Received index ingest response.");
+        assert!(ingest_response.status().is_success());
+        Ok(())
+    }
+}
+
+fn get_http_client_with_retries() -> HttpClient {
+    let max_retries = 3;
+    let retry_policy = 
ExponentialBackoff::builder().build_with_max_retries(max_retries);
+    reqwest_middleware::ClientBuilder::new(reqwest::Client::new())
+        .with(RetryTransientMiddleware::new_with_policy(retry_policy))
+        .build()
+}
+
+async fn get_mapped_quickwit_port(
+    quickwit_container: &ContainerAsync<GenericImage>,
+) -> Result<u16> {
+    let mapped_port = quickwit_container
+        .ports()
+        .await?
+        .map_to_host_port_ipv4(quickwit_container::LISTENING_PORT)
+        .ok_or(anyhow!("No mapping for Quickwit port."))?;
+    info!("Got ports details from container.");
+    Ok(mapped_port)
+}
+
+static INDEX_TIMESTAMP_FIELD: &str = "timestamp";
+
+fn get_quickwit_index_config() -> String {
+    format!(
+        r#"
+    version: 0.8
+    index_id: {DEFAULT_TEST_TOPIC}
+    doc_mapping:
+      mode: strict
+      field_mappings:
+        - name: id
+          type: u64
+        - name: name
+          type: text
+          tokenizer: raw
+        - name: count
+          type: u64
+        - name: amount
+          type: f64
+        - name: active
+          type: bool
+        - name: {INDEX_TIMESTAMP_FIELD}
+          type: datetime
+          input_formats:
+            - unix_timestamp
+          output_format: unix_timestamp_micros
+          fast_precision: microseconds
+          fast: true
+      timestamp_field: timestamp
+    retention:
+      period: 7 days
+      schedule: daily
+    "#
+    )
+}
+
+const STREAMS_KEY_PREFIX: &str = "IGGY_CONNECTORS_SINK_QUICKWIT_STREAMS";
+
+fn get_stream_0_overrides() -> HashMap<String, String> {
+    HashMap::from([
+        (
+            format!("{STREAMS_KEY_PREFIX}_0_STREAM"),
+            DEFAULT_TEST_STREAM.to_string(),
+        ),
+        (
+            format!("{STREAMS_KEY_PREFIX}_0_TOPICS"),
+            format!("[{DEFAULT_TEST_TOPIC}]"),
+        ),
+        (format!("{STREAMS_KEY_PREFIX}_0_SCHEMA"), "json".to_string()),
+    ])
+}
+
+async fn create_quickwit_test_index(
+    quickwit_container: &ContainerAsync<GenericImage>,
+    http_client: &HttpClient,
+) -> Result<()> {
+    let create_response = http_client
+        .post(format!(
+            "http://localhost:{}/{}";,
+            get_mapped_quickwit_port(quickwit_container).await?,
+            quickwit_paths::INDEXES
+        ))
+        .header("Content-Type", "application/yaml")
+        .body(get_quickwit_index_config())
+        .send()
+        .await?;
+    info!("Received create index response.");
+    assert!(create_response.status().is_success());
+    Ok(())
+}
+
+static NETWORK_NAME_PREFIX: &str = "iggy-quickwit-sink";
+
+async fn start_quickwit_container() -> ContainerAsync<GenericImage> {
+    let unique_network = format!("{NETWORK_NAME_PREFIX}-{}", Uuid::new_v4());
+    let random_host = 0;
+    let quickwit_container =
+        GenericImage::new(quickwit_container::REPOSITORY, 
quickwit_container::VERSION)
+            .with_exposed_port(random_host.tcp())
+            .with_wait_for(WaitFor::message_on_stdout(
+                quickwit_container::READY_MESSAGE,
+            ))
+            .with_network(unique_network)
+            .with_cmd(["run"])
+            .with_env_var("QW_LISTEN_ADDRESS", 
quickwit_container::LISTENING_IPV4)
+            .with_mapped_port(random_host, 
quickwit_container::LISTENING_PORT.tcp())
+            .start()
+            .await
+            .expect("Quickwit started.");
+    info!("Started quickwit container.");
+    quickwit_container
+}
+
+const PLUGIN_KEY_PREFIX: &str = "IGGY_CONNECTORS_SINK_QUICKWIT_PLUGIN_CONFIG";
+
+async fn start_iggy_with_quickwit_connector(
+    quickwit_container: &ContainerAsync<GenericImage>,
+) -> Result<ConnectorsRuntime> {
+    let mapped_port = get_mapped_quickwit_port(quickwit_container).await?;
+
+    let plugin_overrides = HashMap::from([
+        (
+            format!("{PLUGIN_KEY_PREFIX}_URL"),
+            format!("http://localhost:{mapped_port}";),
+        ),
+        (
+            format!("{PLUGIN_KEY_PREFIX}_INDEX"),
+            get_quickwit_index_config(),
+        ),
+    ]);
+    let mut extra_envs = HashMap::new();
+    extra_envs.extend(plugin_overrides);
+    extra_envs.extend(get_stream_0_overrides());
+
+    let mut connectors_runtime = setup_runtime();
+    let runtime_config = "quickwit/config.toml";
+    connectors_runtime
+        .init(runtime_config, Some(extra_envs), IggySetup::default())
+        .await;
+
+    Ok(connectors_runtime)
+}
diff --git a/core/integration/tests/connectors/quickwit/quickwit_sink.rs 
b/core/integration/tests/connectors/quickwit/quickwit_sink.rs
new file mode 100644
index 000000000..5c8e02baa
--- /dev/null
+++ b/core/integration/tests/connectors/quickwit/quickwit_sink.rs
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use anyhow::Result;
+use bytes::Bytes;
+use iggy_common::IggyMessage;
+use serde::{Deserialize, Serialize};
+
+use crate::connectors::create_test_messages;
+use crate::connectors::quickwit::{QuickwitTestSetup, start_quickwit_container};
+
+async fn send_test_messages(
+    test_setup: &QuickwitTestSetup,
+    message_count: usize,
+) -> Result<Vec<Bytes>> {
+    let iggy_client = test_setup.runtime.create_client().await;
+
+    let message_payloads = create_test_messages(message_count)
+        .iter()
+        .map(|message| {
+            serde_json::to_vec(message)
+                .map(Bytes::from)
+                .map_err(Into::into)
+        })
+        .collect::<Result<Vec<Bytes>>>()?;
+
+    let mut test_messages = message_payloads
+        .iter()
+        .cloned()
+        .enumerate()
+        .map(|(i, message)| {
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(message)
+                .build()
+                .map_err(Into::into)
+        })
+        .collect::<Result<Vec<IggyMessage>>>()?;
+    iggy_client.send_messages(&mut test_messages).await;
+
+    Ok(message_payloads)
+}
+
+async fn assert_test_index_documents_match_message_payloads(
+    test_setup: &QuickwitTestSetup,
+    message_payloads: &[Bytes],
+) -> Result<()> {
+    test_setup.flush_quickwit_test_index().await?;
+
+    let search_response = 
test_setup.get_quickwit_test_index_all_search().await?;
+    assert_eq!(search_response.num_hits, message_payloads.len());
+
+    for (quickwit_hit, message_payload) in search_response
+        .hits
+        .into_iter()
+        .zip(message_payloads.iter())
+    {
+        assert_eq!(
+            quickwit_hit,
+            serde_json::from_slice::<serde_json::Value>(message_payload)?
+        );
+    }
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn given_existent_quickwit_index_should_store() -> Result<()> {
+    let quickwit_container = start_quickwit_container().await;
+    let test_setup = 
QuickwitTestSetup::try_new_with_precreate_index(quickwit_container).await?;
+
+    let message_count = 11;
+    let sent_payloads = send_test_messages(&test_setup, message_count).await?;
+
+    assert_test_index_documents_match_message_payloads(&test_setup, 
&sent_payloads).await?;
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn given_nonexistent_quickwit_index_should_create_and_store() -> 
Result<()> {
+    let test_setup = QuickwitTestSetup::try_new().await?;
+
+    let message_count = 13;
+    let sent_payloads = send_test_messages(&test_setup, message_count).await?;
+
+    assert_test_index_documents_match_message_payloads(&test_setup, 
&sent_payloads).await?;
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn given_bulk_message_send_should_store() -> Result<()> {
+    let test_setup = QuickwitTestSetup::try_new().await?;
+
+    let message_count = 1000;
+    let sent_payloads = send_test_messages(&test_setup, message_count).await?;
+
+    assert_test_index_documents_match_message_payloads(&test_setup, 
&sent_payloads).await?;
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn given_invalid_messages_should_not_store() -> Result<()> {
+    let test_setup = QuickwitTestSetup::try_new().await?;
+    let iggy_client = test_setup.runtime.create_client().await;
+
+    let first_valid = 
Bytes::from(serde_json::to_vec(&create_test_messages(1)[0])?);
+    let second_valid = 
Bytes::from(serde_json::to_vec(&create_test_messages(1)[0])?);
+
+    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+    struct NotTestMessage {
+        not_a_test_message_field: f64,
+    }
+    let first_invalid = Bytes::from(serde_json::to_vec(&NotTestMessage {
+        not_a_test_message_field: 17.,
+    })?);
+
+    for (message_index, message_payload) in
+        [first_valid.clone(), first_invalid, second_valid.clone()]
+            .into_iter()
+            .enumerate()
+    {
+        iggy_client
+            .send_messages(&mut [IggyMessage::builder()
+                .id(message_index as u128 + 1)
+                .payload(message_payload)
+                .build()?])
+            .await;
+    }
+
+    assert_test_index_documents_match_message_payloads(&test_setup, 
&[first_valid, second_valid])
+        .await?;
+
+    Ok(())
+}
diff --git a/core/integration/tests/connectors/quickwit/sinks/quickwit.toml 
b/core/integration/tests/connectors/quickwit/sinks/quickwit.toml
new file mode 100644
index 000000000..fab316524
--- /dev/null
+++ b/core/integration/tests/connectors/quickwit/sinks/quickwit.toml
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+type = "sink"
+key = "quickwit"
+enabled = true
+version = 0
+name = "Quickwit sink"
+path = "../../target/debug/libiggy_connector_quickwit_sink"
+verbose = false
+
+[[streams]]
+stream = ""
+topics = []
+schema = ""
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "quickwit_sink"
+
+[plugin_config]
+url = ""
+index = ""

Reply via email to