This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch connectors_fix
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/connectors_fix by this push:
new d86f4bf13 Improve elastic test fixture with retries
d86f4bf13 is described below
commit d86f4bf1325dd8639305ad98ff7f076567a3b9e1
Author: spetz <[email protected]>
AuthorDate: Tue Feb 10 08:23:42 2026 +0100
Improve elastic test fixture with retries
---
.../connectors/fixtures/elasticsearch/container.rs | 3 +++
.../connectors/fixtures/elasticsearch/sink.rs | 7 ++---
.../connectors/fixtures/elasticsearch/source.rs | 31 +++++++++++++++-------
3 files changed, 28 insertions(+), 13 deletions(-)
diff --git
a/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
b/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
index 999707fd9..1117ac77b 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
@@ -33,6 +33,9 @@ const ELASTICSEARCH_TAG: &str = "9.3.0";
const ELASTICSEARCH_PORT: u16 = 9200;
const ELASTICSEARCH_READY_MSG: &str = "started";
+pub const HEALTH_CHECK_ATTEMPTS: usize = 30;
+pub const HEALTH_CHECK_INTERVAL_MS: u64 = 500;
+
pub const DEFAULT_TEST_STREAM: &str = "test_stream";
pub const DEFAULT_TEST_TOPIC: &str = "test_topic";
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
index 1b7e9b550..729497bc6 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
@@ -21,7 +21,8 @@ use super::container::{
DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SINK_INDEX, ENV_SINK_PATH,
ENV_SINK_STREAMS_0_CONSUMER_GROUP, ENV_SINK_STREAMS_0_SCHEMA,
ENV_SINK_STREAMS_0_STREAM,
ENV_SINK_STREAMS_0_TOPICS, ENV_SINK_URL, ElasticsearchContainer,
ElasticsearchOps,
- ElasticsearchSearchResponse, create_http_client,
+ ElasticsearchSearchResponse, HEALTH_CHECK_ATTEMPTS,
HEALTH_CHECK_INTERVAL_MS,
+ create_http_client,
};
use async_trait::async_trait;
use integration::harness::{TestBinaryError, TestFixture};
@@ -99,7 +100,7 @@ impl TestFixture for ElasticsearchSinkFixture {
http_client,
};
- for _ in 0..30 {
+ for _ in 0..HEALTH_CHECK_ATTEMPTS {
let url = format!("{}/_cluster/health",
fixture.container.base_url);
if let Ok(response) = fixture.http_client.get(&url).send().await
&& response.status().is_success()
@@ -107,7 +108,7 @@ impl TestFixture for ElasticsearchSinkFixture {
info!("Elasticsearch cluster is healthy");
break;
}
- sleep(Duration::from_millis(500)).await;
+ sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
}
Ok(fixture)
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
index 16535976c..dcf7a457c 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
@@ -21,13 +21,17 @@ use super::container::{
DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SOURCE_BATCH_SIZE,
ENV_SOURCE_INDEX,
ENV_SOURCE_PATH, ENV_SOURCE_POLLING_INTERVAL, ENV_SOURCE_STREAMS_0_SCHEMA,
ENV_SOURCE_STREAMS_0_STREAM, ENV_SOURCE_STREAMS_0_TOPIC,
ENV_SOURCE_TIMESTAMP_FIELD,
- ENV_SOURCE_URL, ElasticsearchContainer, ElasticsearchOps,
create_http_client,
+ ENV_SOURCE_URL, ElasticsearchContainer, ElasticsearchOps,
HEALTH_CHECK_ATTEMPTS,
+ HEALTH_CHECK_INTERVAL_MS, create_http_client,
};
use async_trait::async_trait;
use iggy_common::IggyTimestamp;
use integration::harness::{TestBinaryError, TestFixture};
use reqwest_middleware::ClientWithMiddleware as HttpClient;
use std::collections::HashMap;
+use std::time::Duration;
+use tokio::time::sleep;
+use tracing::info;
const TEST_INDEX: &str = "test_documents";
@@ -98,10 +102,23 @@ impl TestFixture for ElasticsearchSourceFixture {
let container = ElasticsearchContainer::start().await?;
let http_client = create_http_client();
- Ok(Self {
+ let fixture = Self {
container,
http_client,
- })
+ };
+
+ for _ in 0..HEALTH_CHECK_ATTEMPTS {
+ let url = format!("{}/_cluster/health",
fixture.container.base_url);
+ if let Ok(response) = fixture.http_client.get(&url).send().await
+ && response.status().is_success()
+ {
+ info!("Elasticsearch cluster is healthy");
+ break;
+ }
+ sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
+ }
+
+ Ok(fixture)
}
fn connectors_runtime_envs(&self) -> HashMap<String, String> {
@@ -156,13 +173,7 @@ impl ElasticsearchOps for
ElasticsearchSourcePreCreatedFixture {
#[async_trait]
impl TestFixture for ElasticsearchSourcePreCreatedFixture {
async fn setup() -> Result<Self, TestBinaryError> {
- let container = ElasticsearchContainer::start().await?;
- let http_client = create_http_client();
-
- let inner = ElasticsearchSourceFixture {
- container,
- http_client,
- };
+ let inner = ElasticsearchSourceFixture::setup().await?;
inner.setup_index().await?;