This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 9a893b36e66 MINOR: Address flaky wait for producer acks logging for
checking when not al… (#20957)
9a893b36e66 is described below
commit 9a893b36e66bb4e89d03d84f338d1f1e715ec6f9
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Nov 25 15:23:19 2025 -0500
MINOR: Address flaky wait for producer acks logging for checking when not
al… (#20957)
Adding the following for some system tests
- Increased timeout for `await_produced_messages`
- Added more logging for `test_fencing_consumer` which has exhibited
flakiness and is hard to reproduce
Reviewers: Lianet Magrans<[email protected]>
---
tests/kafkatest/services/kafka/kafka.py | 26 ++++++++++++++++++++++++++
tests/kafkatest/tests/client/consumer_test.py | 26 ++++++++++++++++++--------
2 files changed, 44 insertions(+), 8 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index c80197c3a5a..41227716b9f 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -2012,3 +2012,29 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
def java_class_name(self):
return "kafka\.Kafka"
+
+ def describe_consumer_group_members(self, group, node=None,
command_config=None):
+ """ Describe a consumer group.
+ """
+ if node is None:
+ node = self.nodes[0]
+ consumer_group_script = self.path.script("kafka-consumer-groups.sh",
node)
+
+ if command_config is None:
+ command_config = ""
+ else:
+ command_config = "--command-config " + command_config
+
+ cmd = fix_opts_for_new_jvm(node)
+ cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
+ (consumer_group_script,
+ self.bootstrap_servers(self.security_protocol),
+ command_config, group)
+
+ cmd += " --members"
+
+ output_lines = []
+ for line in node.account.ssh_capture(cmd):
+ if not (line.startswith("SLF4J") or line.startswith("GROUP") or
line.strip() == ""):
+ output_lines.append(line.strip())
+ return output_lines
diff --git a/tests/kafkatest/tests/client/consumer_test.py
b/tests/kafkatest/tests/client/consumer_test.py
index 7a8614a1cd9..c28cba1431b 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -306,7 +306,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
producer = self.setup_producer(self.TOPIC)
producer.start()
- self.await_produced_messages(producer)
+ self.await_produced_messages(producer, timeout_sec=120)
consumer = self.setup_consumer(self.TOPIC, static_membership=True,
group_protocol=group_protocol)
@@ -333,7 +333,12 @@ class OffsetValidationTest(VerifiableConsumerTest):
# Consumer protocol: Existing members should remain active and
new conflicting ones should not be able to join.
self.await_consumed_messages(consumer)
assert num_rebalances == consumer.num_rebalances(), "Static
consumers attempt to join with instance id in use should not cause a rebalance"
- assert len(consumer.joined_nodes()) == len(consumer.nodes)
+ try:
+ assert len(consumer.joined_nodes()) == len(consumer.nodes)
+ except AssertionError:
+ self.logger.debug("All members not in group %s. Describe
output is %s", self.group_id,
+ "
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+ raise
assert len(conflict_consumer.joined_nodes()) == 0
# Conflict consumers will terminate due to a fatal
UnreleasedInstanceIdException error.
@@ -344,16 +349,21 @@ class OffsetValidationTest(VerifiableConsumerTest):
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) ==
len(consumer.nodes),
timeout_sec=60,
- err_msg="Timed out waiting for the consumer to
shutdown")
+ err_msg="Timed out waiting for the consumer to
shutdown. Describe output is %s" % "
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+
# Wait until the group becomes empty to ensure the instance ID
is released.
- # We use the 50-second timeout because the consumer session
timeout is 45 seconds.
+ # We use the 60-second timeout because the consumer session
timeout is 45 seconds adding some time for latency.
wait_until(lambda: self.group_id in
self.kafka.list_consumer_groups(state="empty"),
- timeout_sec=50,
- err_msg="Timed out waiting for the consumers to be
removed from the group.")
+ timeout_sec=60,
+ err_msg="Timed out waiting for the consumers to be
removed from the group. Describe output is %s." % "
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+
conflict_consumer.start()
- self.await_members(conflict_consumer, num_conflict_consumers)
+ try:
+ self.await_members(conflict_consumer,
num_conflict_consumers)
+ except TimeoutError:
+ self.logger.debug("All conflict members not in group %s.
Describe output is %s", self.group_id, "
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+ raise
-
else:
consumer.start()
conflict_consumer.start()