This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new db336c3ff82 MINOR: Address flaky wait for producer acks logging for 
checking when not al… (#20957)
db336c3ff82 is described below

commit db336c3ff8287da6a9db84f727d1623885b1ac8a
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Nov 25 15:23:19 2025 -0500

    MINOR: Address flaky wait for producer acks logging for checking when not 
al… (#20957)
    
    Adding the following for some system tests
    - Increased timeout for `await_produced_messages`
    - Added more logging for `test_fencing_consumer` which has exhibited
    flakiness and is hard to reproduce
    
    Reviewers: Lianet Magrans<[email protected]>
---
 tests/kafkatest/services/kafka/kafka.py       | 26 ++++++++++++++++++++++++++
 tests/kafkatest/tests/client/consumer_test.py | 26 ++++++++++++++++++--------
 2 files changed, 44 insertions(+), 8 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index c80197c3a5a..41227716b9f 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -2012,3 +2012,29 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 
     def java_class_name(self):
         return "kafka\.Kafka"
+
+    def describe_consumer_group_members(self, group, node=None, 
command_config=None):
+        """ Describe a consumer group.
+        """
+        if node is None:
+            node = self.nodes[0]
+        consumer_group_script = self.path.script("kafka-consumer-groups.sh", 
node)
+
+        if command_config is None:
+            command_config = ""
+        else:
+            command_config = "--command-config " + command_config
+
+        cmd = fix_opts_for_new_jvm(node)
+        cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
+               (consumer_group_script,
+                self.bootstrap_servers(self.security_protocol),
+                command_config, group)
+
+        cmd += " --members"
+
+        output_lines = []
+        for line in node.account.ssh_capture(cmd):
+            if not (line.startswith("SLF4J") or line.startswith("GROUP") or 
line.strip() == ""):
+                output_lines.append(line.strip())
+        return output_lines
diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index 7a8614a1cd9..c28cba1431b 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -306,7 +306,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         producer = self.setup_producer(self.TOPIC)
 
         producer.start()
-        self.await_produced_messages(producer)
+        self.await_produced_messages(producer, timeout_sec=120)
 
         consumer = self.setup_consumer(self.TOPIC, static_membership=True, 
group_protocol=group_protocol)
 
@@ -333,7 +333,12 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 # Consumer protocol: Existing members should remain active and 
new conflicting ones should not be able to join.
                 self.await_consumed_messages(consumer)
                 assert num_rebalances == consumer.num_rebalances(), "Static 
consumers attempt to join with instance id in use should not cause a rebalance"
-                assert len(consumer.joined_nodes()) == len(consumer.nodes)
+                try:
+                    assert len(consumer.joined_nodes()) == len(consumer.nodes)
+                except AssertionError:
+                    self.logger.debug("All members not in group %s. Describe 
output is %s", self.group_id,
+                                      " 
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+                    raise
                 assert len(conflict_consumer.joined_nodes()) == 0
 
                 # Conflict consumers will terminate due to a fatal 
UnreleasedInstanceIdException error.
@@ -344,16 +349,21 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 consumer.stop_all()
                 wait_until(lambda: len(consumer.dead_nodes()) == 
len(consumer.nodes),
                            timeout_sec=60,
-                           err_msg="Timed out waiting for the consumer to 
shutdown")
+                           err_msg="Timed out waiting for the consumer to 
shutdown. Describe output is %s" % " 
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+
                 # Wait until the group becomes empty to ensure the instance ID 
is released.
-                # We use the 50-second timeout because the consumer session 
timeout is 45 seconds.
+                # We use the 60-second timeout because the consumer session 
timeout is 45 seconds adding some time for latency.
                 wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(state="empty"),
-                           timeout_sec=50,
-                           err_msg="Timed out waiting for the consumers to be 
removed from the group.")
+                           timeout_sec=60,
+                           err_msg="Timed out waiting for the consumers to be 
removed from the group. Describe output is %s." % " 
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+
                 conflict_consumer.start()
-                self.await_members(conflict_consumer, num_conflict_consumers)
+                try:
+                    self.await_members(conflict_consumer, 
num_conflict_consumers)
+                except TimeoutError:
+                    self.logger.debug("All conflict members not in group %s. 
Describe output is %s", self.group_id, " 
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+                    raise
 
-            
         else:
             consumer.start()
             conflict_consumer.start()

Reply via email to