This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch connectors_fix
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/connectors_fix by this push:
new d13f09333 fix consumer group recovery after runtime restart
d13f09333 is described below
commit d13f093337145301683b2ab42514a95a0eeeb16d
Author: spetz <[email protected]>
AuthorDate: Thu Feb 5 18:15:41 2026 +0100
fix consumer group recovery after runtime restart
---
core/integration/tests/server/cg.rs | 4 +-
core/integration/tests/server/mod.rs | 11 +-
...er_group_new_messages_after_restart_scenario.rs | 193 +++++++++++++++++++++
core/integration/tests/server/scenarios/mod.rs | 1 +
core/server/src/metadata/absorb.rs | 14 ++
core/server/src/metadata/ops.rs | 1 +
core/server/src/metadata/writer.rs | 12 ++
core/server/src/shard/system/consumer_groups.rs | 50 +++++-
core/server/src/shard/system/consumer_offsets.rs | 32 ++--
core/server/src/shard/system/messages.rs | 6 -
10 files changed, 304 insertions(+), 20 deletions(-)
diff --git a/core/integration/tests/server/cg.rs
b/core/integration/tests/server/cg.rs
index bf12c871e..0a2b343c0 100644
--- a/core/integration/tests/server/cg.rs
+++ b/core/integration/tests/server/cg.rs
@@ -17,7 +17,8 @@
use crate::server::{
ScenarioFn, auto_commit_reconnection_scenario, join_scenario,
multiple_clients_scenario,
- offset_cleanup_scenario, run_scenario, single_client_scenario,
+ new_messages_after_restart_scenario, offset_cleanup_scenario, run_scenario,
+ single_client_scenario,
};
use iggy_common::TransportProtocol;
use serial_test::parallel;
@@ -40,6 +41,7 @@ fn websocket() -> TransportProtocol {
single_client_scenario(),
multiple_clients_scenario(),
auto_commit_reconnection_scenario(),
+ new_messages_after_restart_scenario(),
offset_cleanup_scenario(),
]
)]
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index 376746436..10340935e 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -34,7 +34,8 @@ use integration::{
};
use scenarios::{
authentication_scenario, bench_scenario,
consumer_group_auto_commit_reconnection_scenario,
- consumer_group_join_scenario, consumer_group_offset_cleanup_scenario,
+ consumer_group_join_scenario,
consumer_group_new_messages_after_restart_scenario,
+ consumer_group_offset_cleanup_scenario,
consumer_group_with_multiple_clients_polling_messages_scenario,
consumer_group_with_single_client_polling_messages_scenario,
consumer_timestamp_polling_scenario, create_message_payload,
message_headers_scenario,
@@ -90,6 +91,14 @@ fn auto_commit_reconnection_scenario() -> ScenarioFn {
}
}
+fn new_messages_after_restart_scenario() -> ScenarioFn {
+ |factory| {
+ Box::pin(consumer_group_new_messages_after_restart_scenario::run(
+ factory,
+ ))
+ }
+}
+
fn offset_cleanup_scenario() -> ScenarioFn {
|factory| Box::pin(consumer_group_offset_cleanup_scenario::run(factory))
}
diff --git
a/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
b/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
new file mode 100644
index 000000000..3b219a3bc
--- /dev/null
+++
b/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
@@ -0,0 +1,193 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{
+ CONSUMER_GROUP_NAME, PARTITION_ID, STREAM_NAME, TOPIC_NAME, create_client,
+};
+use futures::StreamExt;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::str::FromStr;
+use tokio::time::{Duration, sleep, timeout};
+
+const INITIAL_MESSAGES_COUNT: u32 = 10;
+const NEW_MESSAGES_COUNT: u32 = 5;
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+ let client = create_client(client_factory).await;
+ login_root(&client).await;
+ init_system(&client).await;
+ execute_scenario(client_factory, &client).await;
+}
+
+async fn init_system(client: &IggyClient) {
+ client.create_stream(STREAM_NAME).await.unwrap();
+
+ client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+
+ client
+ .create_consumer_group(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ CONSUMER_GROUP_NAME,
+ )
+ .await
+ .unwrap();
+}
+
+async fn execute_scenario(client_factory: &dyn ClientFactory, client:
&IggyClient) {
+ // 1. Produce initial messages
+ produce_messages(client, 1, INITIAL_MESSAGES_COUNT).await;
+
+ // 2. Create a separate client to simulate the runtime
+ let runtime_client = create_client(client_factory).await;
+ login_root(&runtime_client).await;
+
+ // 3. Create consumer and consume all initial messages
+ let mut consumer = create_consumer(&runtime_client).await;
+ let consumed_messages = consume_messages(&mut consumer,
INITIAL_MESSAGES_COUNT).await;
+ assert_eq!(
+ consumed_messages.len(),
+ INITIAL_MESSAGES_COUNT as usize,
+ "Should consume all initial messages"
+ );
+
+ for (index, message) in consumed_messages.iter().enumerate() {
+ let expected_payload = format!("test_message_{}", index + 1);
+ let actual_payload = String::from_utf8_lossy(&message.payload);
+ assert_eq!(
+ actual_payload, expected_payload,
+ "Message content mismatch at index {index}"
+ );
+ }
+
+ // 4. Wait for auto-commit to process
+ sleep(Duration::from_secs(2)).await;
+
+ // 5. Disconnect the consumer and client (simulating runtime restart)
+ drop(consumer);
+ runtime_client.disconnect().await.unwrap();
+ drop(runtime_client);
+ sleep(Duration::from_millis(500)).await;
+
+ // 6. Send new messages after consumer disconnected
+ produce_messages(
+ client,
+ INITIAL_MESSAGES_COUNT + 1,
+ INITIAL_MESSAGES_COUNT + NEW_MESSAGES_COUNT,
+ )
+ .await;
+
+ // 7. Create a new client (simulating runtime restart)
+ let new_runtime_client = create_client(client_factory).await;
+ login_root(&new_runtime_client).await;
+
+ // 8. Reconnect consumer and consume new messages
+ let mut consumer = create_consumer(&new_runtime_client).await;
+ let new_messages = consume_messages(&mut consumer,
NEW_MESSAGES_COUNT).await;
+ assert_eq!(
+ new_messages.len(),
+ NEW_MESSAGES_COUNT as usize,
+ "Should receive all new messages sent after restart"
+ );
+
+ for (index, message) in new_messages.iter().enumerate() {
+ let expected_payload =
+ format!("test_message_{}", INITIAL_MESSAGES_COUNT + 1 + index as
u32);
+ let actual_payload = String::from_utf8_lossy(&message.payload);
+ assert_eq!(
+ actual_payload, expected_payload,
+ "New message content mismatch at index {index}"
+ );
+ }
+
+ drop(consumer);
+ drop(new_runtime_client);
+}
+
+async fn produce_messages(client: &IggyClient, start_id: u32, end_id: u32) {
+ let mut messages = Vec::new();
+ for message_id in start_id..=end_id {
+ let payload = format!("test_message_{message_id}");
+ let message = IggyMessage::from_str(&payload).unwrap();
+ messages.push(message);
+ }
+
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+}
+
+async fn create_consumer(client: &IggyClient) -> IggyConsumer {
+ let mut consumer = client
+ .consumer_group(CONSUMER_GROUP_NAME, STREAM_NAME, TOPIC_NAME)
+ .unwrap()
+ .batch_length(10)
+ .poll_interval(IggyDuration::from_str("100ms").unwrap())
+ .polling_strategy(PollingStrategy::next())
+ .auto_join_consumer_group()
+ .create_consumer_group_if_not_exists()
+ .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+ .build();
+
+ consumer.init().await.unwrap();
+ consumer
+}
+
+async fn consume_messages(consumer: &mut IggyConsumer, expected_count: u32) ->
Vec<IggyMessage> {
+ let mut messages = Vec::new();
+ let mut count = 0;
+
+ let result = timeout(Duration::from_secs(30), async {
+ while count < expected_count {
+ if let Some(message_result) = consumer.next().await {
+ match message_result {
+ Ok(polled_message) => {
+ messages.push(polled_message.message);
+ count += 1;
+ }
+ Err(error) => panic!("Error while consuming messages:
{error}"),
+ }
+ }
+ }
+ })
+ .await;
+
+ if result.is_err() {
+ panic!("Timeout waiting for messages. Expected {expected_count},
received {count}");
+ }
+
+ messages
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index 90d45676f..c376d8648 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -21,6 +21,7 @@ pub mod bench_scenario;
pub mod concurrent_scenario;
pub mod consumer_group_auto_commit_reconnection_scenario;
pub mod consumer_group_join_scenario;
+pub mod consumer_group_new_messages_after_restart_scenario;
pub mod consumer_group_offset_cleanup_scenario;
pub mod consumer_group_with_multiple_clients_polling_messages_scenario;
pub mod consumer_group_with_single_client_polling_messages_scenario;
diff --git a/core/server/src/metadata/absorb.rs
b/core/server/src/metadata/absorb.rs
index 36159dfdc..b541b724c 100644
--- a/core/server/src/metadata/absorb.rs
+++ b/core/server/src/metadata/absorb.rs
@@ -284,11 +284,25 @@ fn apply_op(metadata: &mut InnerMetadata, op:
&MetadataOp, populate_ids: bool) {
group_id,
client_id,
member_id,
+ valid_client_ids,
} => {
if let Some(stream) = metadata.streams.get_mut(*stream_id)
&& let Some(topic) = stream.topics.get_mut(*topic_id)
&& let Some(group) = topic.consumer_groups.get_mut(*group_id)
{
+ if let Some(valid_ids) = valid_client_ids {
+ let stale_members: Vec<usize> = group
+ .members
+ .iter()
+ .filter(|(_, m)| !valid_ids.contains(&m.client_id))
+ .map(|(slot_id, _)| slot_id)
+ .collect();
+
+ for slot_id in stale_members {
+ group.members.remove(slot_id);
+ }
+ }
+
let next_id = group
.members
.iter()
diff --git a/core/server/src/metadata/ops.rs b/core/server/src/metadata/ops.rs
index f489b7309..91e6a0654 100644
--- a/core/server/src/metadata/ops.rs
+++ b/core/server/src/metadata/ops.rs
@@ -114,6 +114,7 @@ pub enum MetadataOp {
group_id: ConsumerGroupId,
client_id: u32,
member_id: Arc<AtomicUsize>,
+ valid_client_ids: Option<Vec<u32>>,
},
LeaveConsumerGroup {
stream_id: StreamId,
diff --git a/core/server/src/metadata/writer.rs
b/core/server/src/metadata/writer.rs
index ca97842b1..30f6dad23 100644
--- a/core/server/src/metadata/writer.rs
+++ b/core/server/src/metadata/writer.rs
@@ -263,6 +263,17 @@ impl MetadataWriter {
topic_id: TopicId,
group_id: ConsumerGroupId,
client_id: u32,
+ ) -> Option<usize> {
+ self.join_consumer_group_with_cleanup(stream_id, topic_id, group_id,
client_id, None)
+ }
+
+ pub fn join_consumer_group_with_cleanup(
+ &mut self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ group_id: ConsumerGroupId,
+ client_id: u32,
+ valid_client_ids: Option<Vec<u32>>,
) -> Option<usize> {
let member_id = Arc::new(AtomicUsize::new(usize::MAX));
self.append(MetadataOp::JoinConsumerGroup {
@@ -271,6 +282,7 @@ impl MetadataWriter {
group_id,
client_id,
member_id: member_id.clone(),
+ valid_client_ids,
});
self.publish();
let id = member_id.load(Ordering::Acquire);
diff --git a/core/server/src/shard/system/consumer_groups.rs
b/core/server/src/shard/system/consumer_groups.rs
index 967eb541a..a2d971049 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -103,8 +103,54 @@ impl IggyShard {
let (stream, topic, group) =
self.resolve_consumer_group_id(stream_id, topic_id, group_id)?;
- self.writer()
- .join_consumer_group(stream, topic, group, client_id);
+ let valid_client_ids: Vec<u32> = self
+ .client_manager
+ .get_clients()
+ .iter()
+ .map(|c| c.session.client_id)
+ .collect();
+
+ self.writer().join_consumer_group_with_cleanup(
+ stream,
+ topic,
+ group,
+ client_id,
+ Some(valid_client_ids),
+ );
+
+ if let Some(cg) = self.metadata.get_consumer_group(stream, topic,
group)
+ && let Some((_, member)) = cg.members.iter().find(|(_, m)|
m.client_id == client_id)
+ && member.partitions.is_empty()
+ && !cg.partitions.is_empty()
+ {
+ let current_valid_ids: Vec<u32> = self
+ .client_manager
+ .get_clients()
+ .iter()
+ .map(|c| c.session.client_id)
+ .collect();
+
+ let potentially_stale: Vec<u32> = cg
+ .members
+ .iter()
+ .filter(|(_, m)| {
+ !m.partitions.is_empty() &&
!current_valid_ids.contains(&m.client_id)
+ })
+ .map(|(_, m)| m.client_id)
+ .collect();
+
+ if !potentially_stale.is_empty() {
+ tracing::info!(
+ "join_consumer_group: new member {client_id} has no
partitions, found stale members: {potentially_stale:?}, forcing leave"
+ );
+
+ for stale_client_id in potentially_stale {
+ let _ =
+ self.writer()
+ .leave_consumer_group(stream, topic, group,
stale_client_id);
+ }
+ }
+ }
self.client_manager
.join_consumer_group(client_id, stream, topic, group)
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index 961011bf8..6e3129515 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -69,16 +69,28 @@ impl IggyShard {
) -> Result<Option<ConsumerOffsetInfo>, IggyError> {
let (stream, topic) = self.resolve_topic_id(stream_id, topic_id)?;
- let Some((polling_consumer, partition_id)) =
self.resolve_consumer_with_partition_id(
- stream_id,
- topic_id,
- &consumer,
- client_id,
- partition_id,
- false,
- )?
- else {
- return Err(IggyError::NotResolvedConsumer(consumer.id));
+ let (polling_consumer, partition_id) = match consumer.kind {
+ ConsumerKind::Consumer => {
+ let Some((polling_consumer, partition_id)) = self
+ .resolve_consumer_with_partition_id(
+ stream_id,
+ topic_id,
+ &consumer,
+ client_id,
+ partition_id,
+ false,
+ )?
+ else {
+ return
Err(IggyError::NotResolvedConsumer(consumer.id.clone()));
+ };
+ (polling_consumer, partition_id)
+ }
+ ConsumerKind::ConsumerGroup => {
+ let (_, _, cg_id) =
+ self.resolve_consumer_group_id(stream_id, topic_id,
&consumer.id)?;
+ let partition_id = partition_id.unwrap_or(0) as usize;
+ (PollingConsumer::consumer_group(cg_id, 0), partition_id)
+ }
};
self.ensure_partition_exists(stream_id, topic_id, partition_id)?;
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 008495743..616c8c7c8 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -414,12 +414,6 @@ impl IggyShard {
(item.offset.load(Ordering::Relaxed), item.path.clone())
}
PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
- tracing::trace!(
- "Auto-committing offset {} for consumer group {} on
partition {:?}",
- offset,
- consumer_group_id.0,
- namespace
- );
let hdl = partition.consumer_group_offsets.pin();
let item = hdl.get_or_insert(
consumer_group_id,