This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_fix
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/connectors_fix by this push:
     new d86f4bf13 Improve elastic test fixture with retries
d86f4bf13 is described below

commit d86f4bf1325dd8639305ad98ff7f076567a3b9e1
Author: spetz <[email protected]>
AuthorDate: Tue Feb 10 08:23:42 2026 +0100

    Improve elastic test fixture with retries
---
 .../connectors/fixtures/elasticsearch/container.rs |  3 +++
 .../connectors/fixtures/elasticsearch/sink.rs      |  7 ++---
 .../connectors/fixtures/elasticsearch/source.rs    | 31 +++++++++++++++-------
 3 files changed, 28 insertions(+), 13 deletions(-)

diff --git 
a/core/integration/tests/connectors/fixtures/elasticsearch/container.rs 
b/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
index 999707fd9..1117ac77b 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
@@ -33,6 +33,9 @@ const ELASTICSEARCH_TAG: &str = "9.3.0";
 const ELASTICSEARCH_PORT: u16 = 9200;
 const ELASTICSEARCH_READY_MSG: &str = "started";
 
+pub const HEALTH_CHECK_ATTEMPTS: usize = 30;
+pub const HEALTH_CHECK_INTERVAL_MS: u64 = 500;
+
 pub const DEFAULT_TEST_STREAM: &str = "test_stream";
 pub const DEFAULT_TEST_TOPIC: &str = "test_topic";
 
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs 
b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
index 1b7e9b550..729497bc6 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
@@ -21,7 +21,8 @@ use super::container::{
     DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SINK_INDEX, ENV_SINK_PATH,
     ENV_SINK_STREAMS_0_CONSUMER_GROUP, ENV_SINK_STREAMS_0_SCHEMA, 
ENV_SINK_STREAMS_0_STREAM,
     ENV_SINK_STREAMS_0_TOPICS, ENV_SINK_URL, ElasticsearchContainer, 
ElasticsearchOps,
-    ElasticsearchSearchResponse, create_http_client,
+    ElasticsearchSearchResponse, HEALTH_CHECK_ATTEMPTS, 
HEALTH_CHECK_INTERVAL_MS,
+    create_http_client,
 };
 use async_trait::async_trait;
 use integration::harness::{TestBinaryError, TestFixture};
@@ -99,7 +100,7 @@ impl TestFixture for ElasticsearchSinkFixture {
             http_client,
         };
 
-        for _ in 0..30 {
+        for _ in 0..HEALTH_CHECK_ATTEMPTS {
             let url = format!("{}/_cluster/health", 
fixture.container.base_url);
             if let Ok(response) = fixture.http_client.get(&url).send().await
                 && response.status().is_success()
@@ -107,7 +108,7 @@ impl TestFixture for ElasticsearchSinkFixture {
                 info!("Elasticsearch cluster is healthy");
                 break;
             }
-            sleep(Duration::from_millis(500)).await;
+            sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
         }
 
         Ok(fixture)
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs 
b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
index 16535976c..dcf7a457c 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
@@ -21,13 +21,17 @@ use super::container::{
     DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SOURCE_BATCH_SIZE, 
ENV_SOURCE_INDEX,
     ENV_SOURCE_PATH, ENV_SOURCE_POLLING_INTERVAL, ENV_SOURCE_STREAMS_0_SCHEMA,
     ENV_SOURCE_STREAMS_0_STREAM, ENV_SOURCE_STREAMS_0_TOPIC, 
ENV_SOURCE_TIMESTAMP_FIELD,
-    ENV_SOURCE_URL, ElasticsearchContainer, ElasticsearchOps, 
create_http_client,
+    ENV_SOURCE_URL, ElasticsearchContainer, ElasticsearchOps, 
HEALTH_CHECK_ATTEMPTS,
+    HEALTH_CHECK_INTERVAL_MS, create_http_client,
 };
 use async_trait::async_trait;
 use iggy_common::IggyTimestamp;
 use integration::harness::{TestBinaryError, TestFixture};
 use reqwest_middleware::ClientWithMiddleware as HttpClient;
 use std::collections::HashMap;
+use std::time::Duration;
+use tokio::time::sleep;
+use tracing::info;
 
 const TEST_INDEX: &str = "test_documents";
 
@@ -98,10 +102,23 @@ impl TestFixture for ElasticsearchSourceFixture {
         let container = ElasticsearchContainer::start().await?;
         let http_client = create_http_client();
 
-        Ok(Self {
+        let fixture = Self {
             container,
             http_client,
-        })
+        };
+
+        for _ in 0..HEALTH_CHECK_ATTEMPTS {
+            let url = format!("{}/_cluster/health", 
fixture.container.base_url);
+            if let Ok(response) = fixture.http_client.get(&url).send().await
+                && response.status().is_success()
+            {
+                info!("Elasticsearch cluster is healthy");
+                break;
+            }
+            sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
+        }
+
+        Ok(fixture)
     }
 
     fn connectors_runtime_envs(&self) -> HashMap<String, String> {
@@ -156,13 +173,7 @@ impl ElasticsearchOps for 
ElasticsearchSourcePreCreatedFixture {
 #[async_trait]
 impl TestFixture for ElasticsearchSourcePreCreatedFixture {
     async fn setup() -> Result<Self, TestBinaryError> {
-        let container = ElasticsearchContainer::start().await?;
-        let http_client = create_http_client();
-
-        let inner = ElasticsearchSourceFixture {
-            container,
-            http_client,
-        };
+        let inner = ElasticsearchSourceFixture::setup().await?;
 
         inner.setup_index().await?;
 

Reply via email to