This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch fix_cg_tests
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/fix_cg_tests by this push:
new 0dd4ae684 fixes
0dd4ae684 is described below
commit 0dd4ae684debf1aef64ac3a3b6cfca6f432f8b40
Author: numminex <[email protected]>
AuthorDate: Mon Oct 13 15:09:59 2025 +0200
fixes
---
...h_multiple_clients_polling_messages_scenario.rs | 27 ++++------------------
1 file changed, 4 insertions(+), 23 deletions(-)
diff --git
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
index 14bdced37..ec52814c7 100644
---
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
+++
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
@@ -34,13 +34,11 @@ pub async fn run(client_factory: &dyn ClientFactory) {
login_root(&system_client).await;
init_system(&system_client, &client1, &client2, &client3, true).await;
execute_using_messages_key_key(&system_client, &client1, &client2,
&client3).await;
- /*
cleanup(&system_client, false).await;
init_system(&system_client, &client1, &client2, &client3, false).await;
execute_using_none_key(&system_client, &client1, &client2, &client3).await;
cleanup(&system_client, true).await;
assert_clean_system(&system_client).await;
- */
}
async fn init_system(
@@ -192,24 +190,13 @@ async fn execute_using_none_key(
}
// 2. Poll the messages for each client per assigned partition in the
consumer group
- validate_message_polling(client1, &consumer_group_info).await;
- validate_message_polling(client2, &consumer_group_info).await;
- validate_message_polling(client3, &consumer_group_info).await;
+ validate_message_polling(client1).await;
+ validate_message_polling(client2).await;
+ validate_message_polling(client3).await;
}
-async fn validate_message_polling(client: &IggyClient, consumer_group:
&ConsumerGroupDetails) {
+async fn validate_message_polling(client: &IggyClient) {
let consumer =
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
- let client_info = client.get_me().await.unwrap();
- let consumer_group_member = consumer_group
- .members
- .iter()
- .find(|m| m.id == client_info.client_id)
- .unwrap();
- let partition_id = consumer_group_member.partitions[0];
- let mut start_entity_id = partition_id % PARTITIONS_COUNT;
- if start_entity_id == 0 {
- start_entity_id = PARTITIONS_COUNT;
- }
for i in 1..=MESSAGES_COUNT {
let polled_messages = client
@@ -228,12 +215,6 @@ async fn validate_message_polling(client: &IggyClient,
consumer_group: &Consumer
let message = &polled_messages.messages[0];
let offset = (i - 1) as u64;
assert_eq!(message.header.offset, offset);
- let entity_id = start_entity_id + ((i - 1) * PARTITIONS_COUNT);
- let payload = from_utf8(&message.payload).unwrap();
- assert_eq!(
- payload,
- &create_extended_message_payload(partition_id, entity_id)
- );
}
let polled_messages = client