This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_fix
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/connectors_fix by this push:
     new 639963cfd Resolve comments
     new d61a1a118 Merge branch 'connectors_fix' of github.com:apache/iggy into 
connectors_fix
639963cfd is described below

commit 639963cfd4c7170d8d646ded3cccf3b22e9d7d81
Author: spetz <[email protected]>
AuthorDate: Tue Feb 10 09:45:07 2026 +0100

    Resolve comments
---
 core/common/src/lib.rs                                         |  4 +---
 core/connectors/sources/postgres_source/src/lib.rs             |  8 +++++---
 .../tests/connectors/fixtures/elasticsearch/sink.rs            | 10 ++++++++--
 .../tests/connectors/fixtures/elasticsearch/source.rs          | 10 ++++++++--
 .../integration/tests/connectors/fixtures/iceberg/container.rs |  2 +-
 core/server/src/shard/system/consumer_offsets.rs               |  3 +++
 6 files changed, 26 insertions(+), 11 deletions(-)

diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index f4fd672f3..58c857fa2 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -35,6 +35,7 @@ pub mod locking;
 pub use alloc::buffer::PooledBuffer;
 pub use alloc::memory_pool::{MEMORY_POOL, MemoryPool, MemoryPoolConfigOther, 
memory_pool};
 pub use certificates::generate_self_signed_certificate;
+pub use chrono::{DateTime, Duration as ChronoDuration, Utc};
 pub use commands::consumer_groups::*;
 pub use commands::consumer_offsets::*;
 pub use commands::messages::*;
@@ -108,8 +109,5 @@ pub use 
utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
 pub use utils::random_id;
 pub use utils::text;
 pub use utils::timestamp::*;
-
-// Re-export chrono types for connectors and other crates that need DateTime
-pub use chrono::{DateTime, Duration as ChronoDuration, Utc};
 pub use utils::topic_size::MaxTopicSize;
 pub use utils::versioning::SemanticVersion;
diff --git a/core/connectors/sources/postgres_source/src/lib.rs 
b/core/connectors/sources/postgres_source/src/lib.rs
index dee51de19..798c4b9e9 100644
--- a/core/connectors/sources/postgres_source/src/lib.rs
+++ b/core/connectors/sources/postgres_source/src/lib.rs
@@ -1396,9 +1396,11 @@ mod tests {
             processed_rows: 1000,
         };
 
-        let serialized = serde_json::to_vec(&original).expect("Failed to 
serialize");
-        let deserialized: State =
-            serde_json::from_slice(&serialized).expect("Failed to 
deserialize");
+        let connector_state =
+            ConnectorState::serialize(&original, "test", 1).expect("Failed to 
serialize state");
+        let deserialized: State = connector_state
+            .deserialize("test", 1)
+            .expect("Failed to deserialize state");
 
         assert_eq!(original.last_poll_time, deserialized.last_poll_time);
         assert_eq!(original.tracking_offsets, deserialized.tracking_offsets);
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs 
b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
index 729497bc6..155250544 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
@@ -106,12 +106,18 @@ impl TestFixture for ElasticsearchSinkFixture {
                 && response.status().is_success()
             {
                 info!("Elasticsearch cluster is healthy");
-                break;
+                return Ok(fixture);
             }
             sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
         }
 
-        Ok(fixture)
+        Err(TestBinaryError::FixtureSetup {
+            fixture_type: "ElasticsearchSink".to_string(),
+            message: format!(
+                "Failed to confirm Elasticsearch cluster health after {} 
attempts",
+                HEALTH_CHECK_ATTEMPTS
+            ),
+        })
     }
 
     fn connectors_runtime_envs(&self) -> HashMap<String, String> {
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs 
b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
index dcf7a457c..a70b9bd10 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
@@ -113,12 +113,18 @@ impl TestFixture for ElasticsearchSourceFixture {
                 && response.status().is_success()
             {
                 info!("Elasticsearch cluster is healthy");
-                break;
+                return Ok(fixture);
             }
             sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
         }
 
-        Ok(fixture)
+        Err(TestBinaryError::FixtureSetup {
+            fixture_type: "ElasticsearchSource".to_string(),
+            message: format!(
+                "Failed to confirm Elasticsearch cluster health after {} 
attempts",
+                HEALTH_CHECK_ATTEMPTS
+            ),
+        })
     }
 
     fn connectors_runtime_envs(&self) -> HashMap<String, String> {
diff --git a/core/integration/tests/connectors/fixtures/iceberg/container.rs 
b/core/integration/tests/connectors/fixtures/iceberg/container.rs
index 372212740..fd1c55fd5 100644
--- a/core/integration/tests/connectors/fixtures/iceberg/container.rs
+++ b/core/integration/tests/connectors/fixtures/iceberg/container.rs
@@ -30,7 +30,7 @@ use tracing::info;
 use uuid::Uuid;
 
 const MINIO_IMAGE: &str = "minio/minio";
-const MINIO_TAG: &str = "latest";
+const MINIO_TAG: &str = "RELEASE.2025-09-07T16-13-09Z";
 const MINIO_PORT: u16 = 9000;
 const MINIO_CONSOLE_PORT: u16 = 9001;
 const ICEBERG_REST_IMAGE: &str = "apache/iceberg-rest-fixture";
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index 6e3129515..7fa16ea5d 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -86,6 +86,9 @@ impl IggyShard {
                 (polling_consumer, partition_id)
             }
             ConsumerKind::ConsumerGroup => {
+                // Reading offsets doesn't require group membership — offsets 
are stored
+                // per consumer group (not per member), so any client can 
query the
+                // group's progress. Only store_consumer_offset enforces 
membership.
                 let (_, _, cg_id) =
                     self.resolve_consumer_group_id(stream_id, topic_id, 
&consumer.id)?;
                 let partition_id = partition_id.unwrap_or(0) as usize;

Reply via email to