This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch connectors_fix
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/connectors_fix by this push:
new 639963cfd Resolve comments
new d61a1a118 Merge branch 'connectors_fix' of github.com:apache/iggy into
connectors_fix
639963cfd is described below
commit 639963cfd4c7170d8d646ded3cccf3b22e9d7d81
Author: spetz <[email protected]>
AuthorDate: Tue Feb 10 09:45:07 2026 +0100
Resolve comments
---
core/common/src/lib.rs | 4 +---
core/connectors/sources/postgres_source/src/lib.rs | 8 +++++---
.../tests/connectors/fixtures/elasticsearch/sink.rs | 10 ++++++++--
.../tests/connectors/fixtures/elasticsearch/source.rs | 10 ++++++++--
.../integration/tests/connectors/fixtures/iceberg/container.rs | 2 +-
core/server/src/shard/system/consumer_offsets.rs | 3 +++
6 files changed, 26 insertions(+), 11 deletions(-)
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index f4fd672f3..58c857fa2 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -35,6 +35,7 @@ pub mod locking;
pub use alloc::buffer::PooledBuffer;
pub use alloc::memory_pool::{MEMORY_POOL, MemoryPool, MemoryPoolConfigOther,
memory_pool};
pub use certificates::generate_self_signed_certificate;
+pub use chrono::{DateTime, Duration as ChronoDuration, Utc};
pub use commands::consumer_groups::*;
pub use commands::consumer_offsets::*;
pub use commands::messages::*;
@@ -108,8 +109,5 @@ pub use
utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
pub use utils::random_id;
pub use utils::text;
pub use utils::timestamp::*;
-
-// Re-export chrono types for connectors and other crates that need DateTime
-pub use chrono::{DateTime, Duration as ChronoDuration, Utc};
pub use utils::topic_size::MaxTopicSize;
pub use utils::versioning::SemanticVersion;
diff --git a/core/connectors/sources/postgres_source/src/lib.rs
b/core/connectors/sources/postgres_source/src/lib.rs
index dee51de19..798c4b9e9 100644
--- a/core/connectors/sources/postgres_source/src/lib.rs
+++ b/core/connectors/sources/postgres_source/src/lib.rs
@@ -1396,9 +1396,11 @@ mod tests {
processed_rows: 1000,
};
- let serialized = serde_json::to_vec(&original).expect("Failed to
serialize");
- let deserialized: State =
- serde_json::from_slice(&serialized).expect("Failed to
deserialize");
+ let connector_state =
+ ConnectorState::serialize(&original, "test", 1).expect("Failed to
serialize state");
+ let deserialized: State = connector_state
+ .deserialize("test", 1)
+ .expect("Failed to deserialize state");
assert_eq!(original.last_poll_time, deserialized.last_poll_time);
assert_eq!(original.tracking_offsets, deserialized.tracking_offsets);
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
index 729497bc6..155250544 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
@@ -106,12 +106,18 @@ impl TestFixture for ElasticsearchSinkFixture {
&& response.status().is_success()
{
info!("Elasticsearch cluster is healthy");
- break;
+ return Ok(fixture);
}
sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
}
- Ok(fixture)
+ Err(TestBinaryError::FixtureSetup {
+ fixture_type: "ElasticsearchSink".to_string(),
+ message: format!(
+ "Failed to confirm Elasticsearch cluster health after {}
attempts",
+ HEALTH_CHECK_ATTEMPTS
+ ),
+ })
}
fn connectors_runtime_envs(&self) -> HashMap<String, String> {
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
index dcf7a457c..a70b9bd10 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
@@ -113,12 +113,18 @@ impl TestFixture for ElasticsearchSourceFixture {
&& response.status().is_success()
{
info!("Elasticsearch cluster is healthy");
- break;
+ return Ok(fixture);
}
sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
}
- Ok(fixture)
+ Err(TestBinaryError::FixtureSetup {
+ fixture_type: "ElasticsearchSource".to_string(),
+ message: format!(
+ "Failed to confirm Elasticsearch cluster health after {}
attempts",
+ HEALTH_CHECK_ATTEMPTS
+ ),
+ })
}
fn connectors_runtime_envs(&self) -> HashMap<String, String> {
diff --git a/core/integration/tests/connectors/fixtures/iceberg/container.rs
b/core/integration/tests/connectors/fixtures/iceberg/container.rs
index 372212740..fd1c55fd5 100644
--- a/core/integration/tests/connectors/fixtures/iceberg/container.rs
+++ b/core/integration/tests/connectors/fixtures/iceberg/container.rs
@@ -30,7 +30,7 @@ use tracing::info;
use uuid::Uuid;
const MINIO_IMAGE: &str = "minio/minio";
-const MINIO_TAG: &str = "latest";
+const MINIO_TAG: &str = "RELEASE.2025-09-07T16-13-09Z";
const MINIO_PORT: u16 = 9000;
const MINIO_CONSOLE_PORT: u16 = 9001;
const ICEBERG_REST_IMAGE: &str = "apache/iceberg-rest-fixture";
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index 6e3129515..7fa16ea5d 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -86,6 +86,9 @@ impl IggyShard {
(polling_consumer, partition_id)
}
ConsumerKind::ConsumerGroup => {
+ // Reading offsets doesn't require group membership — offsets
are stored
+ // per consumer group (not per member), so any client can
query the
+ // group's progress. Only store_consumer_offset enforces
membership.
let (_, _, cg_id) =
self.resolve_consumer_group_id(stream_id, topic_id,
&consumer.id)?;
let partition_id = partition_id.unwrap_or(0) as usize;