This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_fix
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/connectors_fix by this push:
     new 0ca56f99a Fix tests
0ca56f99a is described below

commit 0ca56f99a040d0d2cbc4b051671811c4495e16cf
Author: spetz <[email protected]>
AuthorDate: Thu Feb 5 18:24:57 2026 +0100

    Fix tests
---
 .../tests/connectors/elasticsearch/elasticsearch_sink.rs    |  6 +++---
 .../tests/connectors/elasticsearch/elasticsearch_source.rs  | 13 ++++++-------
 core/integration/tests/connectors/iceberg/iceberg_sink.rs   |  4 ++--
 core/integration/tests/connectors/stdout/stdout_sink.rs     |  6 +++---
 4 files changed, 14 insertions(+), 15 deletions(-)

diff --git 
a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs 
b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
index 5e2113e76..7478a6e6b 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
@@ -35,7 +35,7 @@ async fn elasticsearch_sink_stores_json_messages(
     harness: &TestHarness,
     fixture: ElasticsearchSinkFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
 
     let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
     let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
@@ -93,7 +93,7 @@ async fn elasticsearch_sink_handles_bulk_messages(
     harness: &TestHarness,
     fixture: ElasticsearchSinkFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let bulk_count = 50;
 
     let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
@@ -147,7 +147,7 @@ async fn elasticsearch_sink_preserves_json_structure(
     harness: &TestHarness,
     fixture: ElasticsearchSinkFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
 
     let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
     let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
diff --git 
a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs 
b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
index f421eeb35..eb9e3fa1d 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
@@ -34,7 +34,7 @@ async fn elasticsearch_source_produces_messages_to_iggy(
     harness: &TestHarness,
     fixture: ElasticsearchSourcePreCreatedFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
 
     fixture
         .insert_documents(TEST_MESSAGE_COUNT)
@@ -111,7 +111,7 @@ async fn elasticsearch_source_handles_empty_index(
     harness: &TestHarness,
     fixture: ElasticsearchSourcePreCreatedFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
 
     let doc_count = fixture
         .get_document_count()
@@ -151,7 +151,7 @@ async fn elasticsearch_source_produces_bulk_messages(
     harness: &TestHarness,
     fixture: ElasticsearchSourcePreCreatedFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let bulk_count = 10;
 
     fixture
@@ -213,11 +213,11 @@ async fn state_persists_across_connector_restart(
     let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
     let consumer_id: Identifier = "state_test_consumer".try_into().unwrap();
 
+    let client = harness.root_client().await.unwrap();
     let received_before = {
         let mut received: Vec<serde_json::Value> = Vec::new();
         for _ in 0..POLL_ATTEMPTS {
-            if let Ok(polled) = harness
-                .client()
+            if let Ok(polled) = client
                 .poll_messages(
                     &stream_id,
                     &topic_id,
@@ -274,8 +274,7 @@ async fn state_persists_across_connector_restart(
 
     let mut received_after: Vec<serde_json::Value> = Vec::new();
     for _ in 0..POLL_ATTEMPTS {
-        if let Ok(polled) = harness
-            .client()
+        if let Ok(polled) = client
             .poll_messages(
                 &stream_id,
                 &topic_id,
diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs 
b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
index 9dfd7a1d1..f4f553bc7 100644
--- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
+++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
@@ -75,7 +75,7 @@ async fn iceberg_sink_consumes_json_messages(
     harness: &TestHarness,
     fixture: IcebergPreCreatedFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let api_address = harness
         .connectors_runtime()
         .expect("connector runtime should be available")
@@ -146,7 +146,7 @@ async fn iceberg_sink_handles_bulk_messages(
     harness: &TestHarness,
     fixture: IcebergPreCreatedFixture,
 ) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let api_address = harness
         .connectors_runtime()
         .expect("connector runtime should be available")
diff --git a/core/integration/tests/connectors/stdout/stdout_sink.rs 
b/core/integration/tests/connectors/stdout/stdout_sink.rs
index 783c30de5..b5fa268da 100644
--- a/core/integration/tests/connectors/stdout/stdout_sink.rs
+++ b/core/integration/tests/connectors/stdout/stdout_sink.rs
@@ -36,7 +36,7 @@ const API_KEY: &str = "test-api-key";
     seed = seeds::connector_stream
 )]
 async fn stdout_sink_consumes_messages(harness: &TestHarness) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let api_address = harness
         .connectors_runtime()
         .expect("connector runtime should be available")
@@ -94,7 +94,7 @@ async fn stdout_sink_consumes_messages(harness: &TestHarness) 
{
     seed = seeds::connector_stream
 )]
 async fn stdout_sink_reports_metrics(harness: &TestHarness) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let api_address = harness
         .connectors_runtime()
         .expect("connector runtime should be available")
@@ -154,7 +154,7 @@ async fn stdout_sink_reports_metrics(harness: &TestHarness) 
{
     seed = seeds::connector_stream
 )]
 async fn stdout_sink_handles_bulk_messages(harness: &TestHarness) {
-    let client = harness.client();
+    let client = harness.root_client().await.unwrap();
     let api_address = harness
         .connectors_runtime()
         .expect("connector runtime should be available")

Reply via email to