This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch connectors_fix
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/connectors_fix by this push:
new 0ca56f99a Fix tests
0ca56f99a is described below
commit 0ca56f99a040d0d2cbc4b051671811c4495e16cf
Author: spetz <[email protected]>
AuthorDate: Thu Feb 5 18:24:57 2026 +0100
Fix tests
---
.../tests/connectors/elasticsearch/elasticsearch_sink.rs | 6 +++---
.../tests/connectors/elasticsearch/elasticsearch_source.rs | 13 ++++++-------
core/integration/tests/connectors/iceberg/iceberg_sink.rs | 4 ++--
core/integration/tests/connectors/stdout/stdout_sink.rs | 6 +++---
4 files changed, 14 insertions(+), 15 deletions(-)
diff --git
a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
index 5e2113e76..7478a6e6b 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
@@ -35,7 +35,7 @@ async fn elasticsearch_sink_stores_json_messages(
harness: &TestHarness,
fixture: ElasticsearchSinkFixture,
) {
- let client = harness.client();
+ let client = harness.root_client().await.unwrap();
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
@@ -93,7 +93,7 @@ async fn elasticsearch_sink_handles_bulk_messages(
harness: &TestHarness,
fixture: ElasticsearchSinkFixture,
) {
- let client = harness.client();
+ let client = harness.root_client().await.unwrap();
let bulk_count = 50;
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
@@ -147,7 +147,7 @@ async fn elasticsearch_sink_preserves_json_structure(
harness: &TestHarness,
fixture: ElasticsearchSinkFixture,
) {
- let client = harness.client();
+ let client = harness.root_client().await.unwrap();
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
diff --git
a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
index f421eeb35..eb9e3fa1d 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
@@ -34,7 +34,7 @@ async fn elasticsearch_source_produces_messages_to_iggy(
harness: &TestHarness,
fixture: ElasticsearchSourcePreCreatedFixture,
) {
- let client = harness.client();
+ let client = harness.root_client().await.unwrap();
fixture
.insert_documents(TEST_MESSAGE_COUNT)
@@ -111,7 +111,7 @@ async fn elasticsearch_source_handles_empty_index(
harness: &TestHarness,
fixture: ElasticsearchSourcePreCreatedFixture,
) {
- let client = harness.client();
+ let client = harness.root_client().await.unwrap();
let doc_count = fixture
.get_document_count()
@@ -151,7 +151,7 @@ async fn elasticsearch_source_produces_bulk_messages(
harness: &TestHarness,
fixture: ElasticsearchSourcePreCreatedFixture,
) {
- let client = harness.client();
+ let client = harness.root_client().await.unwrap();
let bulk_count = 10;
fixture
@@ -213,11 +213,11 @@ async fn state_persists_across_connector_restart(
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
let consumer_id: Identifier = "state_test_consumer".try_into().unwrap();
+ let client = harness.root_client().await.unwrap();
let received_before = {
let mut received: Vec<serde_json::Value> = Vec::new();
for _ in 0..POLL_ATTEMPTS {
- if let Ok(polled) = harness
- .client()
+ if let Ok(polled) = client
.poll_messages(
&stream_id,
&topic_id,
@@ -274,8 +274,7 @@ async fn state_persists_across_connector_restart(
let mut received_after: Vec<serde_json::Value> = Vec::new();
for _ in 0..POLL_ATTEMPTS {
- if let Ok(polled) = harness
- .client()
+ if let Ok(polled) = client
.poll_messages(
&stream_id,
&topic_id,
diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
index 9dfd7a1d1..f4f553bc7 100644
--- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
+++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
@@ -75,7 +75,7 @@ async fn iceberg_sink_consumes_json_messages(
harness: &TestHarness,
fixture: IcebergPreCreatedFixture,
) {
- let client = harness.client();
+ let client = harness.root_client().await.unwrap();
let api_address = harness
.connectors_runtime()
.expect("connector runtime should be available")
@@ -146,7 +146,7 @@ async fn iceberg_sink_handles_bulk_messages(
harness: &TestHarness,
fixture: IcebergPreCreatedFixture,
) {
- let client = harness.client();
+ let client = harness.root_client().await.unwrap();
let api_address = harness
.connectors_runtime()
.expect("connector runtime should be available")
diff --git a/core/integration/tests/connectors/stdout/stdout_sink.rs
b/core/integration/tests/connectors/stdout/stdout_sink.rs
index 783c30de5..b5fa268da 100644
--- a/core/integration/tests/connectors/stdout/stdout_sink.rs
+++ b/core/integration/tests/connectors/stdout/stdout_sink.rs
@@ -36,7 +36,7 @@ const API_KEY: &str = "test-api-key";
seed = seeds::connector_stream
)]
async fn stdout_sink_consumes_messages(harness: &TestHarness) {
- let client = harness.client();
+ let client = harness.root_client().await.unwrap();
let api_address = harness
.connectors_runtime()
.expect("connector runtime should be available")
@@ -94,7 +94,7 @@ async fn stdout_sink_consumes_messages(harness: &TestHarness)
{
seed = seeds::connector_stream
)]
async fn stdout_sink_reports_metrics(harness: &TestHarness) {
- let client = harness.client();
+ let client = harness.root_client().await.unwrap();
let api_address = harness
.connectors_runtime()
.expect("connector runtime should be available")
@@ -154,7 +154,7 @@ async fn stdout_sink_reports_metrics(harness: &TestHarness)
{
seed = seeds::connector_stream
)]
async fn stdout_sink_handles_bulk_messages(harness: &TestHarness) {
- let client = harness.client();
+ let client = harness.root_client().await.unwrap();
let api_address = harness
.connectors_runtime()
.expect("connector runtime should be available")